Skip to content

Conversation

@HardMax71
Copy link
Owner

@HardMax71 HardMax71 commented Jan 27, 2026


Summary by cubic

Moved service startup and lifecycle management into DI and refactored workers/services into stateless handlers. Added Redis-backed repositories to replace in-memory managers, enabling horizontal scaling.

  • Refactors

    • Removed LifecycleEnabled and lifecycle-heavy services; converted Coordinator, K8sWorker, PodMonitor, ResultProcessor, Notification, DLQ, Saga, SSE bridge, and EventBus into stateless handlers.
    • DI now provides ready AIOKafkaProducer/Consumer; workers iterate the consumer directly.
    • Deleted QueueManager, ResourceManager, and EventStoreConsumer; removed SSE health endpoint and related code.
    • Simplified dishka_lifespan and providers to centralize init/start/stop in DI.
    • Updated tests to use AIOKafkaConsumer and new DI patterns; removed obsolete tests.
  • New Features

    • Added Redis-backed repositories: ExecutionQueueRepository, ExecutionStateRepository, PodStateRepository, ResourceRepository.
    • Scheduling and resource tracking moved to Redis for atomic ops and multi-instance safety.
    • EventBus exposed as a stateless service via DI for cross-instance pub/sub.

Written for commit 528aaa5. Summary will update on new commits.

Summary by CodeRabbit

  • Refactor

    • Moved to stateless, dependency-injected services and simplified startup/shutdown across workers and services.
    • Replaced in-memory queue/state/resource managers with Redis-backed repositories for durable, scalable queuing, pod state, and resource allocation.
    • Event processing now uses lightweight handlers and Kafka consumer loops instead of lifecycle-managed background services.
  • Chores

    • Removed SSE health endpoint and trimmed legacy lifecycle helpers and factories.

✏️ Tip: You can customize this high-level summary in your review settings.

@coderabbitai
Copy link

coderabbitai bot commented Jan 27, 2026

📝 Walkthrough

Walkthrough

Removed lifecycle base and manual consumer lifecycles; converted many services to stateless, DI-provided handlers; replaced in-memory queue/state/resource managers with Redis-backed repositories; moved provider factories from generator-style lifecycles to direct instance provisioning; updated workers/tests to use explicit Kafka consumer loops and DI wiring.

Changes

Cohort / File(s) Summary
Core lifecycle & bootstrap
backend/app/core/lifecycle.py, backend/app/core/dishka_lifespan.py
Deleted LifecycleEnabled; simplified lifespan by removing AsyncExitStack and manual Kafka/SSE startup/teardown; container close now handles cleanup.
DI providers
backend/app/core/providers.py
Large provider rewrite: MessagingProviderKafkaProvider; added RedisRepositoryProvider, K8sWorkerProvider, PodMonitorProvider, SagaOrchestratorProvider, CoordinatorProvider; many provider methods changed from async-yield factories to direct instance-returning methods and altered signatures for Kafka/producer/consumer wiring.
Redis repositories
backend/app/db/repositories/__init__.py, .../execution_queue_repository.py, .../execution_state_repository.py, .../pod_state_repository.py, .../resource_repository.py
New Redis-backed implementations: ExecutionQueueRepository, ExecutionStateRepository, PodStateRepository, ResourceRepository, plus enums/dataclasses (QueuePriority, QueueStats, PodState, ResourceAllocation, ResourceStats); replace prior in-memory queue/state/resource managers.
Event infra: producer/consumer/event-store
backend/app/events/core/producer.py, backend/app/events/core/consumer.py, backend/app/events/core/__init__.py, backend/app/events/event_store_consumer.py
UnifiedProducer and UnifiedConsumer simplified to stateless handlers consuming DI-provided AIOKafkaProducer/group_id; removed ProducerState/ConsumerState exports; deleted EventStoreConsumer module.
Coordinator & scheduling
backend/app/services/coordinator/coordinator.py, backend/app/services/coordinator/queue_manager.py (removed), backend/app/services/coordinator/resource_manager.py (removed), backend/app/services/coordinator/__init__.py
ExecutionCoordinator rewritten to use Redis repos and DI (public handle_* methods); removed in-process QueueManager and ResourceManager implementations and their tests/exports.
DLQ, EventBus, Notification, SSE
backend/app/dlq/manager.py, backend/app/services/event_bus.py, backend/app/services/notification_service.py, backend/app/services/sse/kafka_redis_bridge.py, backend/app/services/sse/sse_shutdown_manager.py, backend/app/services/sse/sse_service.py
Converted to stateless DI-driven services: DLQManager now exposes handle_dlq_message; EventBus simplified to publish/subscribe APIs and EventBusEvent; NotificationService and SSE components accept EventBus/redis bus and removed lifecycle factories/routers; type-annotation tweaks (dict→Dict).
Other services rewritten to DI handlers
backend/app/services/result_processor/processor.py, backend/app/services/saga/saga_orchestrator.py, backend/app/services/k8s_worker/worker.py, backend/app/services/pod_monitor/monitor.py, backend/app/services/user_settings_service.py, backend/app/services/kafka_event_service.py, backend/app/services/idempotency/middleware.py
Removed lifecycle inheritance and background loops; added public handler entrypoints (e.g., handle_event, handle_* methods), repository-backed state, and adjusted constructor signatures to accept DI-provided dependencies.
Worker entrypoints
backend/workers/... (run_*.py, dlq_processor.py)
Replaced long-running lifecycle services with DI-driven Kafka consumer loops that obtain AIOKafkaConsumer and UnifiedConsumer from container and iterate messages, committing per message; removed signal-based shutdown scaffolding.
Tests
backend/tests/... (e2e & unit changes across many files)
Tests updated to use DI-obtained instances and explicit AIOKafkaConsumer loops; removed tests for deleted modules (queue/resource managers, event_store_consumer, result processor E2E); introduced fake DI doubles in unit tests where appropriate; added redis marker to coordinator e2e.
Docs / plan
backend/di_lifecycle_refactor_plan.md
Added DI lifecycle refactor plan describing goals and migration steps.

Sequence Diagram(s)

mermaid
sequenceDiagram
participant Client
participant Coordinator as ExecutionCoordinator
participant Redis as RedisRepos
participant Producer as UnifiedProducer (Kafka)
Client->>Coordinator: ExecutionRequestedEvent
Coordinator->>Redis: enqueue / try_claim / allocate (ExecutionQueueRepository / ExecutionStateRepository / ResourceRepository)
Redis-->>Coordinator: position / claim result / allocation
Coordinator->>Producer: publish CreatePodCommand / ExecutionAccepted / ExecutionFailed
Producer-->>Kafka: send_and_wait
Coordinator-->>Client: (no direct reply; events emitted)

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~75 minutes

Possibly related PRs

Poem

🐰 I twitch my whiskers, hop with glee,
Old lifecycles gone, DI sets me free.
Redis stores the seeds we sow,
Stateless handlers hop and flow,
A spring of changes — clean and spry! 🌱

🚥 Pre-merge checks | ✅ 1 | ❌ 2
❌ Failed checks (1 warning, 1 inconclusive)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 64.66% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
Title check ❓ Inconclusive The title 'chore: moved init stuff to DI' is vague and does not clearly convey the scope or significance of the changes. It uses informal phrasing ('moved init stuff') rather than describing what was actually refactored. Consider a more descriptive title such as 'refactor: move service lifecycle management to DI providers' or 'refactor: eliminate LifecycleEnabled and centralize startup/shutdown in DI' to better reflect the substantial architectural changes.
✅ Passed checks (1 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing touches
  • 📝 Generate docstrings

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link

@cubic-dev-ai cubic-dev-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

14 issues found across 59 files

Prompt for AI agents (all issues)

Check if these issues are valid — if so, understand the root cause of each and fix them.


<file name="backend/app/services/event_bus.py">

<violation number="1" location="backend/app/services/event_bus.py:138">
P2: Synchronous handlers run directly on the event loop, which can block async processing. Offload sync handlers to a thread executor to avoid stalling Kafka message handling.</violation>
</file>

<file name="backend/workers/run_pod_monitor.py">

<violation number="1" location="backend/workers/run_pod_monitor.py:41">
P2: Ensure cleanup runs even when the consumer loop is cancelled or errors. Wrap the loop in a try/finally so container.close executes on shutdown.</violation>
</file>

<file name="backend/app/db/repositories/resource_repository.py">

<violation number="1" location="backend/app/db/repositories/resource_repository.py:145">
P1: Validate resource requests are non-negative before allocation. Negative values would increase the available pool, allowing callers to mint resources.</violation>
</file>

<file name="backend/workers/run_coordinator.py">

<violation number="1" location="backend/workers/run_coordinator.py:42">
P2: Ensure container cleanup runs even if the consumer loop raises by wrapping the loop in a try/finally and moving `container.close()` into the finally block.</violation>
</file>

<file name="backend/workers/run_k8s_worker.py">

<violation number="1" location="backend/workers/run_k8s_worker.py:41">
P2: Wrap the consume loop in `try/finally` so the DI container is closed even when message handling raises, otherwise resources remain open and the worker can leak connections on failures.</violation>
</file>

<file name="backend/app/services/pod_monitor/monitor.py">

<violation number="1" location="backend/app/services/pod_monitor/monitor.py:187">
P2: Pod events are no longer persisted to the EventRepository before producing to Kafka. This removes the audit copy previously stored via KafkaEventService and can break consumers that rely on stored events.</violation>
</file>

<file name="backend/workers/run_event_replay.py">

<violation number="1" location="backend/workers/run_event_replay.py:33">
P2: On shutdown (cancellation), `await asyncio.Event().wait()` raises `CancelledError` and the container is never closed, leaking resources. Wrap the wait in a try/finally so `container.close()` always runs.</violation>
</file>

<file name="backend/app/events/core/consumer.py">

<violation number="1" location="backend/app/events/core/consumer.py:47">
P2: Guard against missing Kafka headers. This line will crash on messages produced without headers (producer sends `headers=None`). Use a fallback for `msg.headers` and decode only when a value is present.</violation>
</file>

<file name="backend/app/db/repositories/execution_queue_repository.py">

<violation number="1" location="backend/app/db/repositories/execution_queue_repository.py:72">
P2: Queue size and per-user limits are checked outside an atomic transaction, so concurrent enqueues can exceed the limits despite the checks.</violation>

<violation number="2" location="backend/app/db/repositories/execution_queue_repository.py:130">
P2: When queue data is missing, the user count is never decremented, which can permanently inflate per-user limits and block future enqueues.</violation>

<violation number="3" location="backend/app/db/repositories/execution_queue_repository.py:221">
P1: cleanup_stale will delete non-stale entries for priorities 0–9 because the threshold uses BACKGROUND’s priority offset, making the score range include all higher-priority items regardless of timestamp.</violation>
</file>

<file name="backend/workers/run_saga_orchestrator.py">

<violation number="1" location="backend/workers/run_saga_orchestrator.py:41">
P2: Cleanup is not guaranteed if message handling raises or the task is cancelled. Wrap the consumer loop in try/finally so container.close() always runs.</violation>
</file>

<file name="backend/app/services/coordinator/coordinator.py">

<violation number="1" location="backend/app/services/coordinator/coordinator.py:115">
P2: After cancellation, the coordinator doesn’t attempt to schedule the next queued execution. With the scheduling loop removed, this can leave the queue stalled until another event triggers scheduling. Consider calling `_try_schedule_next()` after cancellation cleanup.</violation>
</file>

<file name="backend/tests/e2e/test_k8s_worker_create_pod.py">

<violation number="1" location="backend/tests/e2e/test_k8s_worker_create_pod.py:23">
P2: Using the DI-managed worker ignores `test_settings.K8S_NAMESPACE`; the worker’s `K8sWorkerConfig` namespace comes from the environment, so the test may create resources in a different namespace than the one it asserts/cleans up. This can lead to flaky failures or leaked resources when test settings differ from the env.</violation>
</file>

Reply with feedback, questions, or to request a fix. Tag @cubic-dev-ai to re-run a review.

requested_memory_mb = requested_memory_mb or default_memory

# Apply limits
requested_cpu = min(requested_cpu, self._max_cpu_per_exec)
Copy link

@cubic-dev-ai cubic-dev-ai bot Jan 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1: Validate resource requests are non-negative before allocation. Negative values would increase the available pool, allowing callers to mint resources.

Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At backend/app/db/repositories/resource_repository.py, line 145:

<comment>Validate resource requests are non-negative before allocation. Negative values would increase the available pool, allowing callers to mint resources.</comment>

<file context>
@@ -0,0 +1,300 @@
+            requested_memory_mb = requested_memory_mb or default_memory
+
+        # Apply limits
+        requested_cpu = min(requested_cpu, self._max_cpu_per_exec)
+        requested_memory_mb = min(requested_memory_mb, self._max_memory_per_exec)
+
</file context>
Fix with Cubic

if asyncio.iscoroutinefunction(handler):
await handler(event)
else:
handler(event) # type: ignore[operator]
Copy link

@cubic-dev-ai cubic-dev-ai bot Jan 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2: Synchronous handlers run directly on the event loop, which can block async processing. Offload sync handlers to a thread executor to avoid stalling Kafka message handling.

Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At backend/app/services/event_bus.py, line 138:

<comment>Synchronous handlers run directly on the event loop, which can block async processing. Offload sync handlers to a thread executor to avoid stalling Kafka message handling.</comment>

<file context>
@@ -35,313 +39,102 @@ class Subscription:
+                if asyncio.iscoroutinefunction(handler):
+                    await handler(event)
+                else:
+                    handler(event)  # type: ignore[operator]
+            except Exception as e:
+                self._logger.error(f"Handler failed for {event.event_type}: {e}")
</file context>
Suggested change
handler(event) # type: ignore[operator]
await asyncio.to_thread(handler, event)
Fix with Cubic

Comment on lines +41 to +47
async for msg in kafka_consumer:
await handler.handle(msg)
await kafka_consumer.commit()

logger.info("PodMonitor shutdown complete")

await container.close()
Copy link

@cubic-dev-ai cubic-dev-ai bot Jan 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2: Ensure cleanup runs even when the consumer loop is cancelled or errors. Wrap the loop in a try/finally so container.close executes on shutdown.

Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At backend/workers/run_pod_monitor.py, line 41:

<comment>Ensure cleanup runs even when the consumer loop is cancelled or errors. Wrap the loop in a try/finally so container.close executes on shutdown.</comment>

<file context>
@@ -29,27 +33,18 @@ async def run_pod_monitor(settings: Settings) -> None:
+
+    logger.info("PodMonitor started, consuming events...")
+
+    async for msg in kafka_consumer:
+        await handler.handle(msg)
+        await kafka_consumer.commit()
</file context>
Suggested change
async for msg in kafka_consumer:
await handler.handle(msg)
await kafka_consumer.commit()
logger.info("PodMonitor shutdown complete")
await container.close()
try:
async for msg in kafka_consumer:
await handler.handle(msg)
await kafka_consumer.commit()
finally:
logger.info("PodMonitor shutdown complete")
await container.close()
Fix with Cubic

Comment on lines +42 to +48
async for msg in kafka_consumer:
await handler.handle(msg)
await kafka_consumer.commit()

logger.info("ExecutionCoordinator shutdown complete")

await container.close()
Copy link

@cubic-dev-ai cubic-dev-ai bot Jan 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2: Ensure container cleanup runs even if the consumer loop raises by wrapping the loop in a try/finally and moving container.close() into the finally block.

Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At backend/workers/run_coordinator.py, line 42:

<comment>Ensure container cleanup runs even if the consumer loop raises by wrapping the loop in a try/finally and moving `container.close()` into the finally block.</comment>

<file context>
@@ -27,27 +34,18 @@ async def run_coordinator(settings: Settings) -> None:
+
+    logger.info("ExecutionCoordinator started, consuming events...")
+
+    async for msg in kafka_consumer:
+        await handler.handle(msg)
+        await kafka_consumer.commit()
</file context>
Suggested change
async for msg in kafka_consumer:
await handler.handle(msg)
await kafka_consumer.commit()
logger.info("ExecutionCoordinator shutdown complete")
await container.close()
try:
async for msg in kafka_consumer:
await handler.handle(msg)
await kafka_consumer.commit()
finally:
logger.info("ExecutionCoordinator shutdown complete")
await container.close()
Fix with Cubic

logger.info("SagaOrchestrator started, consuming events...")

logger.info("Saga orchestrator started and running")
async for msg in kafka_consumer:
Copy link

@cubic-dev-ai cubic-dev-ai bot Jan 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2: Cleanup is not guaranteed if message handling raises or the task is cancelled. Wrap the consumer loop in try/finally so container.close() always runs.

Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At backend/workers/run_saga_orchestrator.py, line 41:

<comment>Cleanup is not guaranteed if message handling raises or the task is cancelled. Wrap the consumer loop in try/finally so container.close() always runs.</comment>

<file context>
@@ -27,27 +33,18 @@ async def run_saga_orchestrator(settings: Settings) -> None:
+    logger.info("SagaOrchestrator started, consuming events...")
 
-    logger.info("Saga orchestrator started and running")
+    async for msg in kafka_consumer:
+        await handler.handle(msg)
+        await kafka_consumer.commit()
</file context>
Fix with Cubic

del self._execution_resources[execution_id]
# Update metrics
count = await self._state_repo.get_active_count()
self._metrics.update_coordinator_active_executions(count)
Copy link

@cubic-dev-ai cubic-dev-ai bot Jan 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2: After cancellation, the coordinator doesn’t attempt to schedule the next queued execution. With the scheduling loop removed, this can leave the queue stalled until another event triggers scheduling. Consider calling _try_schedule_next() after cancellation cleanup.

Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At backend/app/services/coordinator/coordinator.py, line 115:

<comment>After cancellation, the coordinator doesn’t attempt to schedule the next queued execution. With the scheduling loop removed, this can leave the queue stalled until another event triggers scheduling. Consider calling `_try_schedule_next()` after cancellation cleanup.</comment>

<file context>
@@ -20,391 +29,219 @@
-            del self._execution_resources[execution_id]
+        # Update metrics
+        count = await self._state_repo.get_active_count()
+        self._metrics.update_coordinator_active_executions(count)
 
-        self._active_executions.discard(execution_id)
</file context>
Suggested change
self._metrics.update_coordinator_active_executions(count)
self._metrics.update_coordinator_active_executions(count)
await self._try_schedule_next()
Fix with Cubic

idem: IdempotencyManager = await scope.get(IdempotencyManager)
event_metrics: EventMetrics = await scope.get(EventMetrics)
# Get worker from DI (already configured with dependencies)
worker: KubernetesWorker = await scope.get(KubernetesWorker)
Copy link

@cubic-dev-ai cubic-dev-ai bot Jan 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2: Using the DI-managed worker ignores test_settings.K8S_NAMESPACE; the worker’s K8sWorkerConfig namespace comes from the environment, so the test may create resources in a different namespace than the one it asserts/cleans up. This can lead to flaky failures or leaked resources when test settings differ from the env.

Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At backend/tests/e2e/test_k8s_worker_create_pod.py, line 23:

<comment>Using the DI-managed worker ignores `test_settings.K8S_NAMESPACE`; the worker’s `K8sWorkerConfig` namespace comes from the environment, so the test may create resources in a different namespace than the one it asserts/cleans up. This can lead to flaky failures or leaked resources when test settings differ from the env.</comment>

<file context>
@@ -25,27 +19,10 @@ async def test_worker_creates_configmap_and_pod(
-    idem: IdempotencyManager = await scope.get(IdempotencyManager)
-    event_metrics: EventMetrics = await scope.get(EventMetrics)
+    # Get worker from DI (already configured with dependencies)
+    worker: KubernetesWorker = await scope.get(KubernetesWorker)
 
-    cfg = K8sWorkerConfig(namespace=ns, max_concurrent_pods=1)
</file context>
Fix with Cubic

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 17

Note

Due to the large number of review comments, Critical, Major severity comments were prioritized as inline comments.

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (4)
backend/app/services/idempotency/middleware.py (1)

50-56: Type mismatch: key_strategy is str but IdempotencyManager expects KeyStrategy enum.

The pipeline failures confirm that IdempotencyManager.check_and_reserve(), mark_completed(), and mark_failed() expect KeyStrategy enum, not str. Either:

  1. Update IdempotencyManager to accept str, or
  2. Keep KeyStrategy enum in this module
🔧 Option 1: Convert string to enum at call sites
+from app.domain.idempotency import KeyStrategy
+
         # Check idempotency
         idempotency_result = await self.idempotency_manager.check_and_reserve(
             event=event,
-            key_strategy=self.key_strategy,
+            key_strategy=KeyStrategy(self.key_strategy),
             custom_key=custom_key,
             ttl_seconds=self.ttl_seconds,
             fields=self.fields,
         )

Apply similar changes to mark_completed() (line 81) and mark_failed() (line 87).

backend/app/services/notification_service.py (1)

350-369: Throttle exemption is ineffective and non-exempt notifications get double-counted.

_create_system_for_user checks throttling and then calls create_notification, which throttles again. Also, throttle_exempt=True still gets throttled in create_notification.

🛠️ Suggested fix
-    async def create_notification(
+    async def create_notification(
             self,
             user_id: str,
             subject: str,
             body: str,
             tags: list[str],
             severity: NotificationSeverity = NotificationSeverity.MEDIUM,
             channel: NotificationChannel = NotificationChannel.IN_APP,
             scheduled_for: datetime | None = None,
             action_url: str | None = None,
-            metadata: dict[str, object] | None = None,
+            metadata: dict[str, object] | None = None,
+            skip_throttle: bool = False,
     ) -> DomainNotification:
         """Create a new notification."""
         if not tags:
             raise NotificationValidationError("tags must be a non-empty list")
@@
-        if await self._throttle_cache.check_throttle(
+        if not skip_throttle and await self._throttle_cache.check_throttle(
                 user_id,
                 severity,
                 window_hours=self._settings.NOTIF_THROTTLE_WINDOW_HOURS,
                 max_per_hour=self._settings.NOTIF_THROTTLE_MAX_PER_HOUR,
         ):
@@
-            if not cfg.throttle_exempt:
-                throttled = await self._throttle_cache.check_throttle(
-                    user_id,
-                    cfg.severity,
-                    window_hours=self._settings.NOTIF_THROTTLE_WINDOW_HOURS,
-                    max_per_hour=self._settings.NOTIF_THROTTLE_MAX_PER_HOUR,
-                )
-                if throttled:
-                    return "throttled"
-
             await self.create_notification(
                 user_id=user_id,
                 subject=title,
                 body=str(base_context.get("message", "Alert")),
                 severity=cfg.severity,
                 tags=tags,
                 channel=NotificationChannel.IN_APP,
                 metadata=base_context,
+                skip_throttle=cfg.throttle_exempt,
             )
             return "created"
+        except NotificationThrottledError:
+            return "throttled"
backend/tests/e2e/test_k8s_worker_create_pod.py (1)

47-71: Ensure cleanup runs even if the test fails.

If pod creation or assertions raise, cleanup won’t run and resources can leak.

🧹 Suggested fix
-    pod = worker._pod_builder.build_pod_manifest(cmd)  # noqa: SLF001
-    await worker._create_pod(pod)  # noqa: SLF001
-
-    # Verify resources exist
-    got_cm = worker._v1.read_namespaced_config_map(name=f"script-{exec_id}", namespace=ns)  # noqa: SLF001
-    assert got_cm is not None
-    got_pod = worker._v1.read_namespaced_pod(name=f"executor-{exec_id}", namespace=ns)  # noqa: SLF001
-    assert got_pod is not None
-
-    # Cleanup
-    worker._v1.delete_namespaced_pod(name=f"executor-{exec_id}", namespace=ns)  # noqa: SLF001
-    worker._v1.delete_namespaced_config_map(name=f"script-{exec_id}", namespace=ns)  # noqa: SLF001
+    pod = worker._pod_builder.build_pod_manifest(cmd)  # noqa: SLF001
+    try:
+        await worker._create_pod(pod)  # noqa: SLF001
+
+        # Verify resources exist
+        got_cm = worker._v1.read_namespaced_config_map(name=f"script-{exec_id}", namespace=ns)  # noqa: SLF001
+        assert got_cm is not None
+        got_pod = worker._v1.read_namespaced_pod(name=f"executor-{exec_id}", namespace=ns)  # noqa: SLF001
+        assert got_pod is not None
+    finally:
+        if worker._v1 is not None:  # noqa: SLF001
+            try:
+                worker._v1.delete_namespaced_pod(name=f"executor-{exec_id}", namespace=ns)  # noqa: SLF001
+            except ApiException as e:
+                if e.status != 404:
+                    raise
+            try:
+                worker._v1.delete_namespaced_config_map(name=f"script-{exec_id}", namespace=ns)  # noqa: SLF001
+            except ApiException as e:
+                if e.status != 404:
+                    raise
backend/app/services/result_processor/processor.py (1)

96-104: Fix ExecutionResultDomain metadata type (MyPy failure).
metadata expects EventMetadata | None, but model_dump() produces a dict. Either pass the EventMetadata instance directly or widen the domain type.

🔧 Suggested fix
-            metadata=event.metadata.model_dump(),
+            metadata=event.metadata,
@@
-            metadata=event.metadata.model_dump(),
+            metadata=event.metadata,
@@
-            metadata=event.metadata.model_dump(),
+            metadata=event.metadata,

Also applies to: 123-131, 154-162

🤖 Fix all issues with AI agents
In `@backend/app/core/providers.py`:
- Around line 131-170: The provider currently hardcodes limits in
RedisRepositoryProvider; update get_execution_queue_repository and
get_resource_repository to read max_queue_size, max_executions_per_user,
total_cpu_cores, and total_memory_mb from the Settings argument instead of
hardcoded literals; add those fields with sensible defaults to the Settings
class (e.g., max_queue_size, max_executions_per_user, total_cpu_cores,
total_memory_mb), then pass settings.max_queue_size and
settings.max_executions_per_user into ExecutionQueueRepository and
settings.total_cpu_cores and settings.total_memory_mb into ResourceRepository
and keep existing initialize() call.

In `@backend/app/db/repositories/execution_queue_repository.py`:
- Around line 218-229: The cleanup_stale method computes a single huge
threshold_score that causes all non-BACKGROUND priorities to be considered
stale; update cleanup_stale to iterate over each QueuePriority and for each
priority compute min_score = priority.value * 1e12 and max_score =
priority.value * 1e12 + (time.time() - self.stale_timeout_seconds) (or
alternatively fetch and check the stored _enqueue_timestamp in the entry hash)
then call zrangebyscore with that per-priority range, decode entries and call
self.remove(execution_id) as before; ensure you still limit batch size (num=100)
and aggregate removed count.
- Around line 71-101: The current enqueue logic performs zcard and hincrby(...,
0) checks outside the pipeline which allows race conditions; change the
implementation in the enqueue routine to perform the queue-size and
per-user-count checks and the zadd + data hset/hincrby/expire atomically by
moving them into a single Redis Lua script (or by using WATCH on QUEUE_KEY and
USER_COUNT_KEY then MULTI/EXEC). Specifically, include the zcard(QUEUE_KEY) and
hincrby(USER_COUNT_KEY, user_id, 0) checks, the zadd(QUEUE_KEY, execution_id,
score), the creation of data_key and its hset/expire, and the
hincrby(USER_COUNT_KEY, user_id, 1) increment inside the atomic operation so the
old pipeline usage (pipe.zadd, pipe.hset, pipe.expire, pipe.hincrby) is executed
only from within the Lua script or guarded MULTI after WATCH; ensure the script
returns appropriate failure/success values (matching the existing
False/None/"Queue is full" or limit-exceeded responses) so the calling code can
handle errors the same way.

In `@backend/app/db/repositories/resource_repository.py`:
- Around line 216-236: The release method has a race where it reads the
allocation then updates the pool separately; change release to perform the read,
increment and delete atomically via a Redis Lua script (similar to allocate()):
create a script that takes KEYS[1]=self.POOL_KEY and KEYS[2]=alloc_key, reads
'cpu','memory','gpu' from the alloc hash (coerce to numbers, default 0), returns
0 if all zero, otherwise calls HINCRBYFLOAT/HINCRBY on the pool fields and DEL
on the alloc key and returns 1; in release() call that script with the two keys
(using self._redis.eval or the client script API), interpret the numeric return
as the boolean result and remove the existing pipeline-based logic and prior
hgetall read. Ensure you reference ALLOC_KEY_PREFIX when building alloc_key and
keep behavior of logging/returning False when script returns 0.
- Around line 117-193: The allocate method currently does a non-atomic
Python-side existence check before running the Lua script; move that existence
check into the Lua script by testing if alloc_key exists (redis.call('EXISTS',
alloc_key)) at the top of lua_script and return a distinct code (e.g., 2) when
it does, keep returning 0 for insufficient resources and 1 for successful
allocation; then update the Python allocate method (the result handling after
self._redis.eval) to treat result == 2 by reading the existing allocation from
alloc_key and returning a ResourceAllocation built from those fields (same
parsing logic used earlier), otherwise proceed as before for success/failure.
- Around line 174-176: alloc_key entries are being set with a 7200s TTL but pool
resource counters (updated in the allocation code around alloc_key and the pool
keys) are never restored if release() isn't called, causing permanent capacity
leaks; add a reconciliation background task (e.g., reclaim_expired_allocations
or reconcile_expired_allocations) that periodically scans redis for keys
matching "alloc_key:*", detects expired or missing allocation metadata and
atomically restores the pool counters (cpu, memory, gpu) by incrementing the
corresponding pool hash fields, using Redis transactions or a Lua script to
avoid races, and mark or remove reclaimed alloc_key entries to make reclamation
idempotent; alternatively add TTLs to pool keys and implement a periodic
reconcile_pools task that verifies sums against active allocations and fixes
discrepancies—update resource_repository functions (alloc_key handling and
release()) to cooperate with the new reconciler.

In `@backend/app/services/coordinator/coordinator.py`:
- Around line 194-204: The current flow dequeues an execution and, on allocation
failure, removes its claim and requeues it via self._state_repo.remove and
self._queue_repo.enqueue, which loses original queue position and can cause
busy-loops; change this by checking resource availability before dequeuing (use
or add a can_allocate or resources_available check in the coordinator) or move
the execution into a "waiting for resources" state instead of re-enqueueing at
the tail, and if you must requeue implement exponential/backoff delay before
calling self._queue_repo.enqueue or mark the item with a waiting flag so dequeue
logic preserves position and avoids tight requeue cycles (update the code paths
around dequeue, allocation, allocation, _state_repo.remove, and
_queue_repo.enqueue accordingly).
- Around line 97-100: handle_execution_requested currently logs exceptions and
records metrics but does not notify downstream systems; update the except block
in handle_execution_requested to call self._publish_scheduling_failed(event,
str(e)) before/after recording the metric so an ExecutionFailedEvent is
published (mirror the behavior in _schedule_execution), keeping the existing
self._logger.error and
self._metrics.record_coordinator_execution_scheduled("error") calls.

In `@backend/app/services/event_bus.py`:
- Around line 74-82: The current try/except around self._producer.send_and_wait
in event_bus (the block catching Exception as e) only logs failures and swallows
the exception; update the error-handling so callers receive failure info —
either re-raise the caught exception after logging or change the method
signature to return a boolean and return False on failure (and True on success);
locate the send_and_wait call and its except Exception as e handler in the
publish/publish_event method, add the chosen behavior (re-raise e after
self._logger.error(...) or return False), and ensure successful paths return
True if using the success indicator approach.

In `@backend/app/services/notification_service.py`:
- Around line 170-174: The truncation code assumes event_data["stdout"] and
["stderr"] are strings but they can be None; update the logic around event_data
(produced by event.model_dump) to guard against None before slicing—e.g.,
replace direct slicing with a safe truncation that uses a default empty string
when the key is None or missing, and apply the same guard for both "stdout" and
"stderr" to prevent TypeError in NotificationService (look for event_data,
"stdout", and "stderr" usage in notification_service.py).

In `@backend/tests/unit/services/pod_monitor/test_monitor.py`:
- Line 15: Replace the incorrect import of ResourceUsageDomain in
test_monitor.py: locate the module that actually defines/exports
ResourceUsageDomain (instead of app.domain.execution.models) and import
ResourceUsageDomain from that module (update the import statement that currently
references ResourceUsageDomain). Ensure the import uses the exact exported
symbol name ResourceUsageDomain so MyPy recognizes it.

In `@backend/tests/unit/services/sse/test_sse_service.py`:
- Around line 13-14: The import for ResourceUsageDomain is incorrect: update the
import line that currently pulls DomainExecution and ResourceUsageDomain from
app.domain.execution so that DomainExecution remains imported from
app.domain.execution and ResourceUsageDomain is imported from
app.domain.events.typed; specifically adjust the import statement referencing
DomainExecution and ResourceUsageDomain in test_sse_service.py so
DomainExecution stays from app.domain.execution and ResourceUsageDomain is
imported from app.domain.events.typed (keeping SSEExecutionStatusDomain and
ShutdownStatus as-is).

In `@backend/workers/run_coordinator.py`:
- Around line 37-48: The async consumer loop in ExecutionCoordinator using
kafka_consumer and handler can raise exceptions which skip cleanup; wrap the
async for msg in kafka_consumer loop in a try/finally so that await
container.close() (and any kafka_consumer commit/close if needed) always runs,
move the "ExecutionCoordinator shutdown complete" log into the finally block,
and ensure resource-specific cleanup for AIOKafkaConsumer and UnifiedConsumer
happens before closing the container (referencing kafka_consumer, handler,
container, AIOKafkaConsumer, UnifiedConsumer).

In `@backend/workers/run_event_replay.py`:
- Around line 32-35: The forever-blocking await asyncio.Event().wait() prevents
container.close() from ever running; replace the anonymous Event with a named
asyncio.Event (e.g., shutdown_event), register signal handlers for
SIGINT/SIGTERM that set shutdown_event (using loop.add_signal_handler or
signal.signal fallback), await shutdown_event.wait() in run_event_replay.py,
then call await container.close() after the wait so the container is closed
during graceful shutdown; reference the existing asyncio.Event().wait() and
container.close() calls when making the change.

In `@backend/workers/run_k8s_worker.py`:
- Around line 36-47: The consumer loop can raise in handler.handle() or
kafka_consumer.commit() and skip cleanup; wrap the async for msg in
kafka_consumer loop in a try/finally so that await container.close() (and any
other shutdown steps) always run. Specifically, enclose the async for ...: await
handler.handle(msg); await kafka_consumer.commit() inside try: ... finally:
await container.close() and keep the logger.info("KubernetesWorker shutdown
complete") inside the finally; apply the same pattern to the other worker
modules (run_result_processor.py, run_saga_orchestrator.py, run_coordinator.py,
run_pod_monitor.py) referencing kafka_consumer, UnifiedConsumer.handle,
kafka_consumer.commit, and container.close where present.

In `@backend/workers/run_result_processor.py`:
- Around line 36-47: Wrap the consumer loop in a try/finally so cleanup always
runs: start by acquiring kafka_consumer (AIOKafkaConsumer) and handler
(UnifiedConsumer) as before, then run the async for msg in kafka_consumer /
await handler.handle(msg) loop inside a try block; in the finally block ensure
you stop/close the kafka_consumer (call kafka_consumer.stop() or equivalent) and
then await container.close(); re-raise cancellation errors if needed so signal
handling still works. This guarantees the consumer and container are closed even
if handler.handle() raises or the task is cancelled.

In `@backend/workers/run_saga_orchestrator.py`:
- Around line 41-47: Add graceful shutdown and per-message error resilience
around the kafka_consumer loop: install asyncio signal handlers for
SIGINT/SIGTERM that set an asyncio.Event (e.g., shutdown_event) and make the
async for msg in kafka_consumer loop respect that event so it stops cleanly;
wrap message processing so each iteration calls await handler.handle(msg) inside
a try/except that logs the exception via logger.error (including exception
details) and continues (optionally still commit offsets on successful processing
via kafka_consumer.commit), and wrap the whole consumer loop in try/finally to
ensure await container.close() and any kafka_consumer stop/close are always
awaited on shutdown. Reference symbols: kafka_consumer, handler.handle,
kafka_consumer.commit, container.close, logger.
🟡 Minor comments (9)
backend/tests/e2e/idempotency/test_consumer_idempotent.py-81-97 (1)

81-97: Test bypasses IdempotentConsumerWrapper - idempotency not exercised.

The IdempotentConsumerWrapper is instantiated but the consume loop calls handler.handle(msg) directly (line 94), bypassing the wrapper entirely. The test verifies that seen["n"] >= 1 but doesn't verify duplicate blocking.

The wrapper's _wrap_handlers() wraps handlers in the dispatcher, but handler.handle() dispatches through the dispatcher, which should work. However, the assertion seen["n"] >= 1 doesn't verify that duplicates were blocked (should be exactly 1 if idempotency works).

💡 Suggested assertion fix to verify idempotency
-        assert seen["n"] >= 1
+        # Both messages produced were duplicates (same event_id), so idempotency should block one
+        assert seen["n"] == 1, f"Expected exactly 1 (duplicates blocked), got {seen['n']}"
backend/workers/run_pod_monitor.py-36-47 (1)

36-47: Guarantee cleanup on handler failures.
If handler.handle() or kafka_consumer.commit() raises an exception, execution skips the container.close() call. Wrap the loop with try/finally to ensure shutdown always runs.

🔧 Suggested fix
-    async for msg in kafka_consumer:
-        await handler.handle(msg)
-        await kafka_consumer.commit()
-
-    logger.info("PodMonitor shutdown complete")
-
-    await container.close()
+    try:
+        async for msg in kafka_consumer:
+            await handler.handle(msg)
+            await kafka_consumer.commit()
+    finally:
+        logger.info("PodMonitor shutdown complete")
+        await container.close()
backend/tests/e2e/events/test_consumer_lifecycle.py-21-45 (1)

21-45: Ensure partitions are assigned before calling seek operations.

After await kafka_consumer.start() (line 37), assignment() may return empty because partition assignment happens asynchronously via group coordination. This causes the test to become a no-op—the if assignment: guard prevents the seek calls from executing.

Add a call to trigger assignment and assert it's non-empty:

🔧 Suggested fix
     await kafka_consumer.start()

     try:
         # Exercise seek functions on AIOKafkaConsumer directly
+        await kafka_consumer.getmany(timeout_ms=1000)
         assignment = kafka_consumer.assignment()
-        if assignment:
-            await kafka_consumer.seek_to_beginning(*assignment)
-            await kafka_consumer.seek_to_end(*assignment)
+        assert assignment, "Expected partition assignment"
+        await kafka_consumer.seek_to_beginning(*assignment)
+        await kafka_consumer.seek_to_end(*assignment)
backend/app/db/repositories/pod_state_repository.py-138-145 (1)

138-145: Add error handling for malformed datetime strings.

If Redis contains corrupted data, datetime.fromisoformat() will raise a ValueError. Consider adding defensive handling.

Proposed fix
+        try:
+            created_at = datetime.fromisoformat(get_str("created_at"))
+            updated_at = datetime.fromisoformat(get_str("updated_at"))
+        except ValueError as e:
+            self._logger.warning(f"Invalid datetime in pod state for {pod_name}: {e}")
+            return None
+
         return PodState(
             pod_name=get_str("pod_name"),
             execution_id=get_str("execution_id"),
             status=get_str("status"),
-            created_at=datetime.fromisoformat(get_str("created_at")),
-            updated_at=datetime.fromisoformat(get_str("updated_at")),
+            created_at=created_at,
+            updated_at=updated_at,
             metadata=metadata,
         )
backend/app/services/event_bus.py-111-120 (1)

111-120: Local subscribers don't receive events published by the same instance.

The handle_kafka_message filters out messages from its own instance (source_instance == self._instance_id), which prevents feedback loops in cross-instance pub/sub. However, this means the publish method only distributes to Kafka—local subscribers never receive events published within the same instance.

If local subscribers should receive locally-published events, add local distribution in the publish method before sending to Kafka.

backend/app/events/core/producer.py-54-54 (1)

54-54: Use get_topic_for_event() helper or handle missing mappings defensively.

Direct dict access at line 54 will raise KeyError for unmapped EventType values. NOTIFICATION_ALL_READ is currently unmapped and defined in the EventType enum but will fail if produced. A defensive helper function get_topic_for_event() already exists in the mappings module with fallback to SYSTEM_EVENTS topic—use it instead of direct dict access, or add explicit error handling for missing types.

Recommended fix
-from app.infrastructure.kafka.mappings import EVENT_TYPE_TO_TOPIC
+from app.infrastructure.kafka.mappings import get_topic_for_event

Then at line 54:

-        topic = f"{self._topic_prefix}{EVENT_TYPE_TO_TOPIC[event_to_produce.event_type]}"
+        topic = f"{self._topic_prefix}{get_topic_for_event(event_to_produce.event_type)}"
backend/app/services/pod_monitor/monitor.py-94-115 (1)

94-115: Harden malformed event handling.
If raw_event["object"] is None or not pod-like, AttributeError/TypeError can escape and break the worker loop. Consider widening the catch.

🩹 Suggested tweak
-        except (KeyError, ValueError) as e:
+        except (KeyError, ValueError, TypeError, AttributeError) as e:
             self._logger.error(f"Invalid event format: {e}")
backend/app/services/saga/saga_orchestrator.py-391-402 (1)

391-402: Same silent exception swallowing issue.

This has the same issue as in _start_saga - the bare except Exception: pass at lines 401-402 silently ignores errors from bind_dependencies. Apply the same fix to log the warning.

🐛 Proposed fix
                     try:
                         saga.bind_dependencies(
                             producer=self._producer,
                             alloc_repo=self._alloc_repo,
                             publish_commands=bool(getattr(self._config, "publish_commands", False)),
                         )
-                    except Exception:
-                        pass
+                    except Exception as e:
+                        self._logger.warning(f"Failed to bind dependencies for saga cancellation {saga_instance.saga_name}: {e}")
backend/app/services/saga/saga_orchestrator.py-204-212 (1)

204-212: Silent exception swallowing hides potential configuration errors.

The bare except Exception: pass at lines 211-212 silently ignores all errors from bind_dependencies. This could mask important configuration issues or bugs. At minimum, log the exception for debugging purposes.

🐛 Proposed fix
         try:
             saga.bind_dependencies(
                 producer=self._producer,
                 alloc_repo=self._alloc_repo,
                 publish_commands=bool(getattr(self._config, "publish_commands", False)),
             )
-        except Exception:
-            pass
+        except Exception as e:
+            self._logger.warning(f"Failed to bind dependencies for saga {saga_name}: {e}")
🧹 Nitpick comments (14)
backend/tests/unit/services/sse/test_shutdown_manager.py (1)

67-80: Consider using small non-zero timeouts to reduce test flakiness.

Setting notification_timeout=0.0 and force_close_timeout=0.0 can lead to unpredictable async behavior. The permissive assertion on line 80 (phase in ("draining", "complete", "closing", "notifying")) compensates for this but doesn't verify a specific transition path.

If the intent is to test rapid transitions, consider using small non-zero values (e.g., 0.001) to ensure the async machinery has minimal but consistent time to execute.

♻️ Suggested improvement
 m = SSEShutdownManager(
     drain_timeout=0.01,
-    notification_timeout=0.0,
-    force_close_timeout=0.0,
+    notification_timeout=0.001,
+    force_close_timeout=0.001,
     logger=_test_logger,
     connection_metrics=connection_metrics,
 )
backend/app/services/kafka_event_service.py (1)

4-4: Prefer lowercase dict for type hints in Python 3.9+.

The codebase uses modern Python syntax (e.g., str | None on line 44), so importing Dict from typing is unnecessary. Consider using dict[str, Any] directly for consistency.

♻️ Proposed fix
-from typing import Any, Dict
+from typing import Any

Then update all Dict[...] annotations to dict[...] throughout the file.

backend/app/services/idempotency/middleware.py (2)

21-23: Inconsistent typing: Using Set[str] but str for key_strategy.

If the intent is to accept string-based strategies, consider also using KeyStrategy enum with string conversion, or document the valid string values as constants. The current approach loses type safety without clear documentation of valid values.


5-5: Prefer lowercase set and dict for type hints.

Similar to the kafka_event_service.py feedback, Python 3.9+ supports lowercase generics directly.

♻️ Proposed fix
-from typing import Any, Awaitable, Callable, Dict, Set
+from typing import Any, Awaitable, Callable

Then use set[str], dict[...] throughout.

backend/tests/e2e/events/test_consume_roundtrip.py (1)

47-76: Align auto-commit with manual commits in the test loop.
Line 53 enables auto-commit while Line 76 commits manually; this can advance offsets before handler.handle completes. Consider disabling auto-commit when you explicitly commit.

🔧 Suggested fix
-        enable_auto_commit=True,
+        enable_auto_commit=False,
backend/app/services/sse/kafka_redis_bridge.py (1)

51-84: Return a copy of relevant event types and keep status ordering deterministic.

This prevents accidental mutation of the module-level set and avoids flaky status output ordering.

♻️ Proposed change
     `@staticmethod`
     def get_relevant_event_types() -> set[EventType]:
         """Get event types that should be routed to SSE.
 
         Helper for worker entrypoint to know which topics to subscribe to.
         """
-        return RELEVANT_EVENT_TYPES
+        return set(RELEVANT_EVENT_TYPES)
@@
     async def get_status(self) -> dict[str, list[str]]:
         """Get bridge status."""
         return {
-            "relevant_event_types": [str(et) for et in RELEVANT_EVENT_TYPES],
+            "relevant_event_types": sorted(str(et) for et in RELEVANT_EVENT_TYPES),
         }
backend/tests/e2e/events/test_event_dispatcher.py (1)

47-83: Disable auto-commit when manually committing offsets.

With enable_auto_commit=True and explicit commit() calls, auto-commit runs in the background every ~5000ms and may commit offsets before handler.handle() completes. This creates a race condition where offset commits become non-deterministic, potentially introducing flakiness in this test. Official aiokafka documentation recommends disabling auto-commit when using manual commit.

♻️ Proposed change
     kafka_consumer = AIOKafkaConsumer(
         topic,
         bootstrap_servers=settings.KAFKA_BOOTSTRAP_SERVERS,
         group_id=group_id,
-        enable_auto_commit=True,
+        enable_auto_commit=False,
         auto_offset_reset="earliest",
     )
backend/app/events/core/producer.py (1)

76-76: Consider lazy logging to avoid string formatting overhead.

Using f-strings in logging statements causes the string to be formatted even when the log level is disabled. For debug/error logs in hot paths, prefer lazy formatting.

Example using lazy formatting
-            self._logger.debug(f"Message [{event_to_produce}] sent to topic: {topic}")
+            self._logger.debug("Message [%s] sent to topic: %s", event_to_produce, topic)
-            self._logger.error(f"Failed to produce message: {e}")
+            self._logger.error("Failed to produce message: %s", e)

Also applies to: 83-83

backend/app/db/repositories/pod_state_repository.py (1)

77-100: track_pod overwrites created_at on re-tracking.

When tracking a pod that was already tracked, this will overwrite the original created_at timestamp. If preserving the original creation time is important, consider checking existence first or using HSETNX for the created_at field.

Alternative: preserve original created_at
     async def track_pod(
         self,
         pod_name: str,
         execution_id: str,
         status: str,
         metadata: dict[str, object] | None = None,
         ttl_seconds: int = 7200,
     ) -> None:
         """Track a pod's state."""
         key = f"{self.TRACKED_KEY_PREFIX}:{pod_name}"
         now = datetime.now(timezone.utc).isoformat()

+        # Check if already tracked to preserve original created_at
+        existing = await self._redis.hget(key, "created_at")
+        created_at = existing.decode() if existing else now
+
         data = {
             "pod_name": pod_name,
             "execution_id": execution_id,
             "status": status,
-            "created_at": now,
+            "created_at": created_at,
             "updated_at": now,
             "metadata": json.dumps(metadata) if metadata else "{}",
         }
backend/app/services/k8s_worker/worker.py (1)

283-283: Logging level should be DEBUG, not INFO.

The log message showing image name sanitization (before: {image_ref} -> {sanitized_image_ref}) is implementation detail useful for debugging, not operational info. Consider changing to DEBUG level.

Suggested change
-                self._logger.info(f"DAEMONSET: before: {image_ref} -> {sanitized_image_ref}")
+                self._logger.debug(f"Image name sanitized: {image_ref} -> {sanitized_image_ref}")
backend/app/services/event_bus.py (1)

42-42: Handler type annotation is too loose.

Using object for the handler type loses all type safety. Consider using a proper Callable type hint for better IDE support and type checking.

Suggested typing improvement
+from collections.abc import Awaitable, Callable
+from typing import Union
+
+EventHandler = Callable[[EventBusEvent], Union[None, Awaitable[None]]]
+
 `@dataclass`
 class Subscription:
     """Represents a single event subscription."""

     id: str = field(default_factory=lambda: str(uuid4()))
     pattern: str = ""
-    handler: object = field(default=None)
+    handler: EventHandler | None = field(default=None)

And update the subscribe signature:

-    async def subscribe(self, pattern: str, handler: object) -> str:
+    async def subscribe(self, pattern: str, handler: EventHandler) -> str:

Also applies to: 84-84

backend/app/services/saga/saga_orchestrator.py (1)

69-106: Consider extracting completion events to a class constant to avoid duplication.

The completion_events set is defined identically in both get_trigger_event_types (lines 81-85) and handle_event (lines 99-103). Extracting this to a class-level constant would prevent potential inconsistencies if one is updated without the other.

♻️ Suggested refactor
 class SagaOrchestrator:
     """Stateless saga orchestrator - pure event handler.
 
     No lifecycle methods (start/stop) - receives ready-to-use dependencies from DI.
     All state stored in SagaRepository. Worker entrypoint handles the consume loop.
     """
+    
+    _COMPLETION_EVENTS: frozenset[EventType] = frozenset({
+        EventType.EXECUTION_COMPLETED,
+        EventType.EXECUTION_FAILED,
+        EventType.EXECUTION_TIMEOUT,
+    })

Then use self._COMPLETION_EVENTS in both methods.

backend/app/services/coordinator/coordinator.py (1)

279-292: Hardcoded estimated_wait_seconds=None - consider calculating or documenting.

The estimated_wait_seconds field is set to None (line 286). If this is intentional for now, consider adding a TODO comment. Otherwise, this could be calculated from queue stats and average execution time to provide useful feedback to users.

backend/app/core/providers.py (1)

644-703: Config objects created inline with defaults - consider DI consistency.

K8sWorkerConfig (line 658) and PodMonitorConfig (line 694) are instantiated inline with default values. For consistency with other configuration patterns (e.g., SagaConfig which uses a factory function), consider extracting these to factory functions or providing them via the Settings object if configuration is needed.

This is a minor consistency suggestion - the current approach works if defaults are sufficient.

Comment on lines +131 to +170
class RedisRepositoryProvider(Provider):
"""Provides Redis-backed state repositories for stateless services."""

scope = Scope.APP

@provide
def get_execution_state_repository(
self, redis_client: redis.Redis, logger: logging.Logger
) -> ExecutionStateRepository:
return ExecutionStateRepository(redis_client, logger)

@provide
def get_execution_queue_repository(
self, redis_client: redis.Redis, logger: logging.Logger, settings: Settings
) -> ExecutionQueueRepository:
return ExecutionQueueRepository(
redis_client,
logger,
max_queue_size=10000,
max_executions_per_user=100,
)

@provide
async def get_resource_repository(
self, redis_client: redis.Redis, logger: logging.Logger, settings: Settings
) -> ResourceRepository:
repo = ResourceRepository(
redis_client,
logger,
total_cpu_cores=32.0,
total_memory_mb=65536,
)
await repo.initialize()
return repo

@provide
def get_pod_state_repository(
self, redis_client: redis.Redis, logger: logging.Logger
) -> PodStateRepository:
return PodStateRepository(redis_client, logger)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion | 🟠 Major

Hardcoded resource limits should be configurable via Settings.

The RedisRepositoryProvider has hardcoded values for queue and resource limits:

  • max_queue_size=10000 (line 149)
  • max_executions_per_user=100 (line 150)
  • total_cpu_cores=32.0 (line 160)
  • total_memory_mb=65536 (line 161)

These should be pulled from Settings to allow configuration per environment without code changes.

♻️ Suggested refactor
     `@provide`
     def get_execution_queue_repository(
             self, redis_client: redis.Redis, logger: logging.Logger, settings: Settings
     ) -> ExecutionQueueRepository:
         return ExecutionQueueRepository(
             redis_client,
             logger,
-            max_queue_size=10000,
-            max_executions_per_user=100,
+            max_queue_size=settings.QUEUE_MAX_SIZE,
+            max_executions_per_user=settings.QUEUE_MAX_PER_USER,
         )

     `@provide`
     async def get_resource_repository(
             self, redis_client: redis.Redis, logger: logging.Logger, settings: Settings
     ) -> ResourceRepository:
         repo = ResourceRepository(
             redis_client,
             logger,
-            total_cpu_cores=32.0,
-            total_memory_mb=65536,
+            total_cpu_cores=settings.RESOURCE_TOTAL_CPU_CORES,
+            total_memory_mb=settings.RESOURCE_TOTAL_MEMORY_MB,
         )
         await repo.initialize()
         return repo

Add corresponding fields to your Settings class with sensible defaults.

🤖 Prompt for AI Agents
In `@backend/app/core/providers.py` around lines 131 - 170, The provider currently
hardcodes limits in RedisRepositoryProvider; update
get_execution_queue_repository and get_resource_repository to read
max_queue_size, max_executions_per_user, total_cpu_cores, and total_memory_mb
from the Settings argument instead of hardcoded literals; add those fields with
sensible defaults to the Settings class (e.g., max_queue_size,
max_executions_per_user, total_cpu_cores, total_memory_mb), then pass
settings.max_queue_size and settings.max_executions_per_user into
ExecutionQueueRepository and settings.total_cpu_cores and
settings.total_memory_mb into ResourceRepository and keep existing initialize()
call.

Comment on lines +117 to +193
async def allocate(
self,
execution_id: str,
language: str,
requested_cpu: float | None = None,
requested_memory_mb: int | None = None,
requested_gpu: int = 0,
) -> ResourceAllocation | None:
"""Allocate resources for execution. Returns allocation or None if insufficient."""
# Check if already allocated
alloc_key = f"{self.ALLOC_KEY_PREFIX}:{execution_id}"
existing = await self._redis.hgetall(alloc_key) # type: ignore[misc]
if existing:
self._logger.warning(f"Execution {execution_id} already has allocation")
return ResourceAllocation(
execution_id=execution_id,
cpu_cores=float(existing.get(b"cpu", existing.get("cpu", 0))),
memory_mb=int(existing.get(b"memory", existing.get("memory", 0))),
gpu_count=int(existing.get(b"gpu", existing.get("gpu", 0))),
)

# Determine requested resources
if requested_cpu is None or requested_memory_mb is None:
default_cpu, default_memory = self.DEFAULT_ALLOCATIONS.get(language, (0.5, 512))
requested_cpu = requested_cpu or default_cpu
requested_memory_mb = requested_memory_mb or default_memory

# Apply limits
requested_cpu = min(requested_cpu, self._max_cpu_per_exec)
requested_memory_mb = min(requested_memory_mb, self._max_memory_per_exec)

# Atomic allocation using Lua script
lua_script = """
local pool_key = KEYS[1]
local alloc_key = KEYS[2]
local req_cpu = tonumber(ARGV[1])
local req_memory = tonumber(ARGV[2])
local req_gpu = tonumber(ARGV[3])
local min_cpu = tonumber(ARGV[4])
local min_memory = tonumber(ARGV[5])
local avail_cpu = tonumber(redis.call('HGET', pool_key, 'available_cpu') or '0')
local avail_memory = tonumber(redis.call('HGET', pool_key, 'available_memory') or '0')
local avail_gpu = tonumber(redis.call('HGET', pool_key, 'available_gpu') or '0')
local cpu_after = avail_cpu - req_cpu
local memory_after = avail_memory - req_memory
local gpu_after = avail_gpu - req_gpu
if cpu_after < min_cpu or memory_after < min_memory or gpu_after < 0 then
return 0
end
redis.call('HSET', pool_key, 'available_cpu', tostring(cpu_after))
redis.call('HSET', pool_key, 'available_memory', tostring(memory_after))
redis.call('HSET', pool_key, 'available_gpu', tostring(gpu_after))
redis.call('HSET', alloc_key, 'cpu', tostring(req_cpu), 'memory', tostring(req_memory),
'gpu', tostring(req_gpu))
redis.call('EXPIRE', alloc_key, 7200)
return 1
"""

result = await self._redis.eval( # type: ignore[misc]
lua_script,
2,
self.POOL_KEY,
alloc_key,
str(requested_cpu),
str(requested_memory_mb),
str(requested_gpu),
str(self._min_reserve_cpu),
str(self._min_reserve_memory),
)

if not result:
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

🏁 Script executed:

cat -n backend/app/db/repositories/resource_repository.py | head -250

Repository: HardMax71/Integr8sCode

Length of output: 10876


Move the allocation existence check inside the Lua script to prevent concurrent reservation for the same execution_id.

The current implementation checks for an existing allocation on line 128 before executing the Lua script, creating a race condition. Two concurrent callers can both pass the Python check (when no allocation exists yet) and then both execute the Lua script, resulting in duplicate allocations for the same execution_id. The Lua script at lines 149–179 lacks an existence check and only validates resource availability, making the check non-atomic.

Add an EXISTS check at the start of the Lua script and return a distinct code (e.g., 2) when allocation already exists. Then handle that result in Python to fetch and return the existing allocation:

🛠️ Suggested fix (atomic existence check in Lua)
@@
-        local req_gpu = tonumber(ARGV[3])
+        local req_gpu = tonumber(ARGV[3])
+        if redis.call('EXISTS', alloc_key) == 1 then
+            return 2
+        end
@@
-        if not result:
+        if result == 2:
+            existing = await self._redis.hgetall(alloc_key)  # type: ignore[misc]
+            self._logger.warning(f"Execution {execution_id} already has allocation")
+            return ResourceAllocation(
+                execution_id=execution_id,
+                cpu_cores=float(existing.get(b"cpu", existing.get("cpu", 0))),
+                memory_mb=int(existing.get(b"memory", existing.get("memory", 0))),
+                gpu_count=int(existing.get(b"gpu", existing.get("gpu", 0))),
+            )
+        if not result:
             pool = await self._redis.hgetall(self.POOL_KEY)  # type: ignore[misc]
🤖 Prompt for AI Agents
In `@backend/app/db/repositories/resource_repository.py` around lines 117 - 193,
The allocate method currently does a non-atomic Python-side existence check
before running the Lua script; move that existence check into the Lua script by
testing if alloc_key exists (redis.call('EXISTS', alloc_key)) at the top of
lua_script and return a distinct code (e.g., 2) when it does, keep returning 0
for insufficient resources and 1 for successful allocation; then update the
Python allocate method (the result handling after self._redis.eval) to treat
result == 2 by reading the existing allocation from alloc_key and returning a
ResourceAllocation built from those fields (same parsing logic used earlier),
otherwise proceed as before for success/failure.

Comment on lines +174 to +176
redis.call('HSET', alloc_key, 'cpu', tostring(req_cpu), 'memory', tostring(req_memory),
'gpu', tostring(req_gpu))
redis.call('EXPIRE', alloc_key, 7200)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

🏁 Script executed:

# First, let's look at the file and context around the problematic lines
wc -l backend/app/db/repositories/resource_repository.py

Repository: HardMax71/Integr8sCode

Length of output: 120


🏁 Script executed:

# Read the file to understand the context
cat -n backend/app/db/repositories/resource_repository.py | sed -n '160,190p'

Repository: HardMax71/Integr8sCode

Length of output: 1416


🏁 Script executed:

# Let's also check the broader scope to understand the allocation mechanism
cat -n backend/app/db/repositories/resource_repository.py | head -50

Repository: HardMax71/Integr8sCode

Length of output: 1459


🏁 Script executed:

# Look for release or deallocate methods in the repository
rg -n "def.*release|def.*deallocate|def.*free|def.*cleanup" backend/app/db/repositories/resource_repository.py

Repository: HardMax71/Integr8sCode

Length of output: 125


🏁 Script executed:

# Search for where alloc_key might be deleted or cleaned up
rg -n "DEL|cleanup|reclaim|sweeper|reconcil" backend/app/db/repositories/resource_repository.py

Repository: HardMax71/Integr8sCode

Length of output: 48


🏁 Script executed:

# Let's read more of the file to see all methods
cat -n backend/app/db/repositories/resource_repository.py | tail -150

Repository: HardMax71/Integr8sCode

Length of output: 7171


🏁 Script executed:

# Search for any background jobs or tasks that might handle cleanup
fd -t f "\.py$" backend/app | xargs rg -l "sweeper|reconcil|cleanup.*alloc|expire.*alloc" 2>/dev/null | head -20

Repository: HardMax71/Integr8sCode

Length of output: 191


🏁 Script executed:

# Let's check the pod_monitor files for reconciliation logic
cat -n backend/app/services/pod_monitor/monitor.py | head -100

Repository: HardMax71/Integr8sCode

Length of output: 3583


🏁 Script executed:

# Search for any cleanup or reconciliation logic related to allocations
rg -n "reclaim|cleanup|orphan|leaked|expired" backend/app/services/pod_monitor/ -A 3 -B 1

Repository: HardMax71/Integr8sCode

Length of output: 459


🏁 Script executed:

# Check the entire backend for sweeper/reconciler patterns
rg -l "sweeper|reconciler|cleanup.*alloc|scan.*alloc" backend/app

Repository: HardMax71/Integr8sCode

Length of output: 48


🏁 Script executed:

# Let's see how release() is called in the codebase
rg -n "\.release\(" backend/app -B 2 -A 2 | head -60

Repository: HardMax71/Integr8sCode

Length of output: 1650


🏁 Script executed:

# Let's check the coordinator more closely to understand the full lifecycle
cat -n backend/app/services/coordinator/coordinator.py | sed -n '100,160p'

Repository: HardMax71/Integr8sCode

Length of output: 2952


🏁 Script executed:

# Search for any background tasks or jobs that might handle cleanup
rg -l "task|schedule|background|cron|timer|interval" backend/app --type py | xargs rg -l "alloc" 2>/dev/null

Repository: HardMax71/Integr8sCode

Length of output: 268


🏁 Script executed:

# Check if there's any handler for expired allocations or reconciliation
rg -n "scan_iter.*ALLOC|expired|orphan" backend/app -A 3 -B 1

Repository: HardMax71/Integr8sCode

Length of output: 7490


🏁 Script executed:

# Look for any initialization or startup logic that might launch cleanup jobs
rg -n "startup|shutdown|lifespan|background.*task" backend/app/main.py -A 5 -B 2

Repository: HardMax71/Integr8sCode

Length of output: 618


🏁 Script executed:

# Let's check the lifespan for background tasks
cat -n backend/app/core/dishka_lifespan.py

Repository: HardMax71/Integr8sCode

Length of output: 4452


🏁 Script executed:

# Look for any result processor or background job that might handle this
cat -n backend/app/services/result_processor/resource_cleaner.py | head -80

Repository: HardMax71/Integr8sCode

Length of output: 3516


🏁 Script executed:

# Search the entire codebase for any background task that might reconcile Redis allocations
rg -n "background_task|create_task|gather|Task\(" backend/app --type py | grep -i "alloc\|resource" -A 2 -B 2

Repository: HardMax71/Integr8sCode

Length of output: 507


🏁 Script executed:

# Check if there's any retry/recovery for execution events
rg -n "ExecutionCompleted|ExecutionFailed|ExecutionCancelled" backend/app --type py -B 2 -A 2 | head -80

Repository: HardMax71/Integr8sCode

Length of output: 6401


🏁 Script executed:

# Check if there's dead letter queue or event replay for failed events
rg -n "dead.*letter|dlq|replay|retry.*event" backend/app --type py | head -30

Repository: HardMax71/Integr8sCode

Length of output: 3092


🏁 Script executed:

# Let's verify that pod monitor is the ONLY source of these events
rg -n "ExecutionCompletedEvent\|ExecutionFailedEvent\|ExecutionCancelledEvent" backend/app --type py -l

Repository: HardMax71/Integr8sCode

Length of output: 48


🏁 Script executed:

# Check if there's any monitoring/alerting for orphaned allocations
rg -n "allocation.*orphan\|orphan.*alloc\|leaked\|capacity.*leak" backend/app --type py

Repository: HardMax71/Integr8sCode

Length of output: 48


🏁 Script executed:

# Check pod_monitor more carefully - it must be creating the events
rg -n "ExecutionCompletedEvent\|ExecutionFailedEvent" backend/app/services/pod_monitor/ --type py

Repository: HardMax71/Integr8sCode

Length of output: 48


🏁 Script executed:

# Verify the complete flow - from pod monitor to coordinator
rg -n "class.*Event\|@dataclass.*Event" backend/app/domain/events/typed.py -A 3 | head -40

Repository: HardMax71/Integr8sCode

Length of output: 48


🏁 Script executed:

# Check if there's any timeout-based reconciliation
rg -n "timeout\|ttl\|7200" backend/app/db/repositories/resource_repository.py -B 2 -A 2

Repository: HardMax71/Integr8sCode

Length of output: 48


🏁 Script executed:

# Find all files that mention ExecutionCompletedEvent or similar
find backend/app -name "*.py" -type f | xargs grep -l "ExecutionCompletedEvent" 2>/dev/null

Repository: HardMax71/Integr8sCode

Length of output: 331


🏁 Script executed:

# Check the pod_monitor event_mapper more carefully
cat -n backend/app/services/pod_monitor/event_mapper.py | grep -A 10 "def _map_completed"

Repository: HardMax71/Integr8sCode

Length of output: 635


🏁 Script executed:

# Look at the complete resource_repository to see ALL methods
cat -n backend/app/db/repositories/resource_repository.py | sed -n '130,180p'

Repository: HardMax71/Integr8sCode

Length of output: 2679


Allocation TTL creates permanent pool capacity leak without recovery mechanism.

The 7200-second TTL on alloc_key (line 176) will automatically expire the allocation record, but the decremented pool resources (lines 170-172) are never restored if release() is not called. While normal completions are handled via pod monitor events → coordinator release, any failure path that prevents event publication (pod undetected crash, lost event, coordinator crash before release) will permanently leak capacity.

No sweeper or reconciliation job exists to reclaim expired allocations. Either implement a background task to scan for expired alloc_key:* entries and restore pool resources, or add a TTL to the pool keys themselves with periodic reconciliation.

🤖 Prompt for AI Agents
In `@backend/app/db/repositories/resource_repository.py` around lines 174 - 176,
alloc_key entries are being set with a 7200s TTL but pool resource counters
(updated in the allocation code around alloc_key and the pool keys) are never
restored if release() isn't called, causing permanent capacity leaks; add a
reconciliation background task (e.g., reclaim_expired_allocations or
reconcile_expired_allocations) that periodically scans redis for keys matching
"alloc_key:*", detects expired or missing allocation metadata and atomically
restores the pool counters (cpu, memory, gpu) by incrementing the corresponding
pool hash fields, using Redis transactions or a Lua script to avoid races, and
mark or remove reclaimed alloc_key entries to make reclamation idempotent;
alternatively add TTLs to pool keys and implement a periodic reconcile_pools
task that verifies sums against active allocations and fixes
discrepancies—update resource_repository functions (alloc_key handling and
release()) to cooperate with the new reconciler.

Comment on lines +37 to +48
kafka_consumer = await container.get(AIOKafkaConsumer)
handler = await container.get(UnifiedConsumer)

logger.info("ExecutionCoordinator started, consuming events...")

async for msg in kafka_consumer:
await handler.handle(msg)
await kafka_consumer.commit()

logger.info("ExecutionCoordinator shutdown complete")

await container.close()
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

find . -type f -name "run_coordinator.py" | head -5

Repository: HardMax71/Integr8sCode

Length of output: 102


🏁 Script executed:

cat -n backend/workers/run_coordinator.py

Repository: HardMax71/Integr8sCode

Length of output: 3053


🏁 Script executed:

find . -type f -name "container.py" | grep -i core

Repository: HardMax71/Integr8sCode

Length of output: 97


🏁 Script executed:

find . -type f -name "*container*" | head -10

Repository: HardMax71/Integr8sCode

Length of output: 140


🏁 Script executed:

rg "class.*container|def create_coordinator_container" -t py | head -20

Repository: HardMax71/Integr8sCode

Length of output: 167


🏁 Script executed:

cat -n backend/app/core/container.py | head -150

Repository: HardMax71/Integr8sCode

Length of output: 5288


🏁 Script executed:

rg "create_coordinator_container" -A 30 backend/app/core/container.py

Repository: HardMax71/Integr8sCode

Length of output: 1035


🏁 Script executed:

rg "class.*Provider|AIOKafkaConsumer" backend/app/core/providers.py -A 5 | head -100

Repository: HardMax71/Integr8sCode

Length of output: 2866


🏁 Script executed:

find . -type f -name "providers.py" | xargs wc -l | sort -n

Repository: HardMax71/Integr8sCode

Length of output: 101


🏁 Script executed:

rg "CoordinatorProvider|class.*Provider" backend/app/core/providers.py | grep -A 15 "CoordinatorProvider"

Repository: HardMax71/Integr8sCode

Length of output: 294


🏁 Script executed:

rg "AIOKafkaConsumer" backend/app/core/providers.py -B 5 -A 15

Repository: HardMax71/Integr8sCode

Length of output: 48


🏁 Script executed:

rg "class CoordinatorProvider" backend/app/core/providers.py -A 30

Repository: HardMax71/Integr8sCode

Length of output: 1043


🏁 Script executed:

rg "KafkaServicesProvider" backend/app/core/providers.py -A 30

Repository: HardMax71/Integr8sCode

Length of output: 957


🏁 Script executed:

rg "AIOKafkaConsumer" backend/app/core/providers.py -B 10 -A 10

Repository: HardMax71/Integr8sCode

Length of output: 48


🏁 Script executed:

rg "AIOKafkaConsumer" backend/ --type py

Repository: HardMax71/Integr8sCode

Length of output: 2707


🏁 Script executed:

rg "provide.*AIOKafkaConsumer|async def.*kafka.*consumer" backend/app/core/providers.py -B 5 -A 20

Repository: HardMax71/Integr8sCode

Length of output: 48


🏁 Script executed:

rg "MessagingProvider\|EventProvider" backend/app/core/providers.py -A 50 | head -150

Repository: HardMax71/Integr8sCode

Length of output: 48


🏁 Script executed:

rg "def.*kafka" backend/app/core/providers.py -B 3 -A 15

Repository: HardMax71/Integr8sCode

Length of output: 1852


🏁 Script executed:

sed -n '1,818p' backend/app/core/providers.py | grep -n "kafka\|Kafka" -i

Repository: HardMax71/Integr8sCode

Length of output: 2108


🏁 Script executed:

rg "class EventProvider" backend/app/core/providers.py -A 100 | head -120

Repository: HardMax71/Integr8sCode

Length of output: 3239


🏁 Script executed:

rg "UnifiedConsumer\|AIOKafkaConsumer" backend/app/core/providers.py -B 5 -A 15

Repository: HardMax71/Integr8sCode

Length of output: 48


🏁 Script executed:

rg "class MessagingProvider" backend/app/core/providers.py -A 150 | head -180

Repository: HardMax71/Integr8sCode

Length of output: 4874


🏁 Script executed:

rg "def.*consumer|async def.*consumer" backend/app/core/providers.py -B 3 -A 15

Repository: HardMax71/Integr8sCode

Length of output: 48


🏁 Script executed:

rg "AIOKafkaConsumer\|UnifiedConsumer" backend/app/core/providers.py

Repository: HardMax71/Integr8sCode

Length of output: 48


🏁 Script executed:

rg "provide.*Consumer|@provide" backend/app/core/providers.py | grep -i consumer

Repository: HardMax71/Integr8sCode

Length of output: 48


🏁 Script executed:

find . -type f -name "*.py" -path "*/app/core/*" | xargs grep -l "AIOKafkaConsumer\|provide.*consumer" -i

Repository: HardMax71/Integr8sCode

Length of output: 50


🏁 Script executed:

rg "UnifiedConsumer" backend/ -l

Repository: HardMax71/Integr8sCode

Length of output: 618


🏁 Script executed:

rg "def.*UnifiedConsumer\|class UnifiedConsumer" backend/ -B 2 -A 15

Repository: HardMax71/Integr8sCode

Length of output: 48


🏁 Script executed:

cat -n backend/app/events/core/consumer.py | head -200

Repository: HardMax71/Integr8sCode

Length of output: 3047


🏁 Script executed:

rg "provide" backend/app/events/core/consumer.py

Repository: HardMax71/Integr8sCode

Length of output: 48


🏁 Script executed:

rg "@provide" backend/app/core/providers.py | grep -i "consumer\|unified"

Repository: HardMax71/Integr8sCode

Length of output: 48


🏁 Script executed:

rg "UnifiedConsumer" backend/app/core/providers.py -B 5 -A 5

Repository: HardMax71/Integr8sCode

Length of output: 48


Wrap the consumer loop in try/finally to ensure cleanup on exceptions.

Any exception in the async for loop (lines 42-44) skips await container.close(), leaving the Kafka consumer and other resources open. The codebase establishes a clear pattern—resources like AIOKafkaProducer, K8sClients, and IdempotencyManager all use try/finally for cleanup. Apply the same pattern here:

Suggested fix
-    async for msg in kafka_consumer:
-        await handler.handle(msg)
-        await kafka_consumer.commit()
-
-    logger.info("ExecutionCoordinator shutdown complete")
-
-    await container.close()
+    try:
+        async for msg in kafka_consumer:
+            await handler.handle(msg)
+            await kafka_consumer.commit()
+    finally:
+        await kafka_consumer.stop()
+        await container.close()
+        logger.info("ExecutionCoordinator shutdown complete")
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
kafka_consumer = await container.get(AIOKafkaConsumer)
handler = await container.get(UnifiedConsumer)
logger.info("ExecutionCoordinator started, consuming events...")
async for msg in kafka_consumer:
await handler.handle(msg)
await kafka_consumer.commit()
logger.info("ExecutionCoordinator shutdown complete")
await container.close()
kafka_consumer = await container.get(AIOKafkaConsumer)
handler = await container.get(UnifiedConsumer)
logger.info("ExecutionCoordinator started, consuming events...")
try:
async for msg in kafka_consumer:
await handler.handle(msg)
await kafka_consumer.commit()
finally:
await kafka_consumer.stop()
await container.close()
logger.info("ExecutionCoordinator shutdown complete")
🤖 Prompt for AI Agents
In `@backend/workers/run_coordinator.py` around lines 37 - 48, The async consumer
loop in ExecutionCoordinator using kafka_consumer and handler can raise
exceptions which skip cleanup; wrap the async for msg in kafka_consumer loop in
a try/finally so that await container.close() (and any kafka_consumer
commit/close if needed) always runs, move the "ExecutionCoordinator shutdown
complete" log into the finally block, and ensure resource-specific cleanup for
AIOKafkaConsumer and UnifiedConsumer happens before closing the container
(referencing kafka_consumer, handler, container, AIOKafkaConsumer,
UnifiedConsumer).

Comment on lines +32 to +35
# Service is HTTP-driven, wait for external shutdown
await asyncio.Event().wait()

await asyncio.Event().wait()
await container.close()
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Unreachable container.close() - missing signal handling.

asyncio.Event().wait() blocks forever since the event is never set. Line 35 await container.close() will never execute, causing potential resource leaks when the process terminates.

Consider adding signal handling for graceful shutdown:

🔧 Proposed fix with signal handling
+import signal
+
 async def run_replay_service(settings: Settings) -> None:
     """Run the event replay service."""
 
     container = create_event_replay_container(settings)
-
     logger = await container.get(logging.Logger)
     logger.info("Starting EventReplayService with DI container...")
 
     db = await container.get(Database)
     await init_beanie(database=db, document_models=ALL_DOCUMENTS)
 
     logger.info("Event replay service initialized and ready")
 
-    # Service is HTTP-driven, wait for external shutdown
-    await asyncio.Event().wait()
-
-    await container.close()
+    # Wait for shutdown signal
+    shutdown_event = asyncio.Event()
+    loop = asyncio.get_running_loop()
+    for sig in (signal.SIGTERM, signal.SIGINT):
+        loop.add_signal_handler(sig, shutdown_event.set)
+
+    await shutdown_event.wait()
+    logger.info("Shutdown signal received, closing container...")
+    await container.close()
🤖 Prompt for AI Agents
In `@backend/workers/run_event_replay.py` around lines 32 - 35, The
forever-blocking await asyncio.Event().wait() prevents container.close() from
ever running; replace the anonymous Event with a named asyncio.Event (e.g.,
shutdown_event), register signal handlers for SIGINT/SIGTERM that set
shutdown_event (using loop.add_signal_handler or signal.signal fallback), await
shutdown_event.wait() in run_event_replay.py, then call await container.close()
after the wait so the container is closed during graceful shutdown; reference
the existing asyncio.Event().wait() and container.close() calls when making the
change.

Comment on lines +36 to +47
kafka_consumer = await container.get(AIOKafkaConsumer)
handler = await container.get(UnifiedConsumer)

logger.info("KubernetesWorker started, consuming events...")

async for msg in kafka_consumer:
await handler.handle(msg)
await kafka_consumer.commit()

logger.info("KubernetesWorker shutdown complete")

await container.close()
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

# First, let's look at the actual file around the specified lines
cd /tmp && find . -name "run_k8s_worker.py" -type f 2>/dev/null

Repository: HardMax71/Integr8sCode

Length of output: 48


🏁 Script executed:

# Search for the file in the repository
git ls-files | grep -i "run_k8s_worker"

Repository: HardMax71/Integr8sCode

Length of output: 99


🏁 Script executed:

# Look for worker-related files
git ls-files | grep -i worker | head -20

Repository: HardMax71/Integr8sCode

Length of output: 856


🏁 Script executed:

# List backend directory structure
git ls-files | grep "^backend/" | head -30

Repository: HardMax71/Integr8sCode

Length of output: 999


🏁 Script executed:

# Read the actual file
cat -n backend/workers/run_k8s_worker.py

Repository: HardMax71/Integr8sCode

Length of output: 3010


🏁 Script executed:

# Check AIOKafkaConsumer API and usage patterns in codebase
rg "AIOKafkaConsumer" -A 5 -B 5 | head -100

Repository: HardMax71/Integr8sCode

Length of output: 6496


🏁 Script executed:

# Check container.close() implementation
rg "def close" backend/app/core/container.py -A 15

Repository: HardMax71/Integr8sCode

Length of output: 48


🏁 Script executed:

# Look at other worker files to see if they handle cleanup similarly
ls -la backend/workers/run_*.py

Repository: HardMax71/Integr8sCode

Length of output: 580


🏁 Script executed:

# Check one of the other worker files for comparison
cat -n backend/workers/run_result_processor.py

Repository: HardMax71/Integr8sCode

Length of output: 3054


🏁 Script executed:

# Find the container.close() implementation
find . -name "container.py" -type f | xargs cat -n | head -200

Repository: HardMax71/Integr8sCode

Length of output: 6533


🏁 Script executed:

# Search for container close implementation more specifically
rg "async def close" backend/app/core/ -A 20

Repository: HardMax71/Integr8sCode

Length of output: 48


🏁 Script executed:

# Check how container manages Kafka consumer lifecycle
rg "AIOKafkaConsumer" backend/app/core/container.py -B 5 -A 10

Repository: HardMax71/Integr8sCode

Length of output: 48


🏁 Script executed:

# Find providers that register Kafka consumer
rg "K8sWorkerProvider|KafkaServicesProvider|EventProvider" backend/app/core/providers.py -A 20 | head -150

Repository: HardMax71/Integr8sCode

Length of output: 2219


🏁 Script executed:

# Search for how AIOKafkaConsumer is provided/registered
rg "AIOKafkaConsumer" backend/app/core/providers.py -B 5 -A 15

Repository: HardMax71/Integr8sCode

Length of output: 48


🏁 Script executed:

# Look at all providers
ls backend/app/core/

Repository: HardMax71/Integr8sCode

Length of output: 275


🏁 Script executed:

# Search for AIOKafkaConsumer registration in providers
rg "AIOKafkaConsumer" backend/app/core/providers.py -B 10 -A 10

Repository: HardMax71/Integr8sCode

Length of output: 48


🏁 Script executed:

# Look at the full providers.py file to understand Kafka consumer registration
wc -l backend/app/core/providers.py

Repository: HardMax71/Integr8sCode

Length of output: 99


🏁 Script executed:

# Read the providers file to find how Kafka consumer is set up
head -300 backend/app/core/providers.py | tail -100

Repository: HardMax71/Integr8sCode

Length of output: 3289


🏁 Script executed:

# Search for provide decorator and Kafka consumer
rg "@provide" backend/app/core/providers.py | grep -i kafka

Repository: HardMax71/Integr8sCode

Length of output: 48


🏁 Script executed:

# Search more broadly for get_aiokafka_consumer or consumer registration
rg "get_aiokafka_consumer|AIOKafkaConsumer" backend/app/core/providers.py -B 5 -A 15

Repository: HardMax71/Integr8sCode

Length of output: 48


🏁 Script executed:

# Read the entire providers.py to find consumer registration
cat backend/app/core/providers.py | grep -A 20 -B 5 "AIOKafkaConsumer"

Repository: HardMax71/Integr8sCode

Length of output: 48


🏁 Script executed:

# Search for UnifiedConsumer which is what gets the kafka_consumer
rg "UnifiedConsumer" backend/app/core/providers.py -B 5 -A 15

Repository: HardMax71/Integr8sCode

Length of output: 48


🏁 Script executed:

# Check where AIOKafkaConsumer is imported from and how it's used
rg "from.*AIOKafkaConsumer|import.*AIOKafkaConsumer" backend/app/ -B 2 -A 8

Repository: HardMax71/Integr8sCode

Length of output: 902


🏁 Script executed:

# Search for where UnifiedConsumer is defined and how it gets the kafka_consumer
rg "class UnifiedConsumer" backend/app/ -A 30

Repository: HardMax71/Integr8sCode

Length of output: 2307


🏁 Script executed:

# Look for how kafka_consumer is provided - check EventProvider more carefully
cat backend/app/core/providers.py | grep -A 50 "class EventProvider"

Repository: HardMax71/Integr8sCode

Length of output: 1662


🏁 Script executed:

# Search for async generator patterns with kafka
rg "AsyncIterator\[AIOKafkaConsumer\]" backend/app/ -B 10 -A 10

Repository: HardMax71/Integr8sCode

Length of output: 48


🏁 Script executed:

# Search all of providers.py for `@provide` methods that mention consumer
cat backend/app/core/providers.py | grep -B 2 -A 20 "@provide" | grep -B 2 -A 20 -i "consumer"

Repository: HardMax71/Integr8sCode

Length of output: 48


🏁 Script executed:

# Look for where workers get their kafka_consumer from - check the actual container.get call
rg "container.get.*AIOKafkaConsumer" backend/workers/ -B 5 -A 5

Repository: HardMax71/Integr8sCode

Length of output: 4104


🏁 Script executed:

# Check if kafka consumer is provided as part of EventProvider or elsewhere
grep -n "kafka_consumer\|AIOKafkaConsumer" backend/app/core/providers.py

Repository: HardMax71/Integr8sCode

Length of output: 48


🏁 Script executed:

# Let's search the entire providers.py file line by line for kafka
cat backend/app/core/providers.py | grep -i kafka

Repository: HardMax71/Integr8sCode

Length of output: 1948


🏁 Script executed:

# Check if there are other provider files that might define it
find backend/app -name "*provider*" -type f

Repository: HardMax71/Integr8sCode

Length of output: 95


🏁 Script executed:

# Search entire app for where AIOKafkaConsumer is provided/created
rg "get_aiokafka_consumer|AsyncIterator.*Kafka" backend/app/ -B 5 -A 15

Repository: HardMax71/Integr8sCode

Length of output: 1340


🏁 Script executed:

# Let's check the dishka_lifespan.py which might have setup
cat backend/app/core/dishka_lifespan.py

Repository: HardMax71/Integr8sCode

Length of output: 3773


🏁 Script executed:

# Search for where AIOKafkaConsumer is provided as a dependency with proper async cleanup
rg "get_.*consumer|AIOKafkaConsumer" backend/app/core/providers.py -B 8 -A 15

Repository: HardMax71/Integr8sCode

Length of output: 48


🏁 Script executed:

# Let's look at the entire EventProvider class in providers.py
sed -n '/class EventProvider/,/^class /p' backend/app/core/providers.py | head -150

Repository: HardMax71/Integr8sCode

Length of output: 1071


🏁 Script executed:

# Check if there's a consumer provider we missed
grep -n "class.*Provider" backend/app/core/providers.py

Repository: HardMax71/Integr8sCode

Length of output: 1016


🏁 Script executed:

# Check CoordinatorProvider for consumer setup
sed -n '/^class CoordinatorProvider/,/^class /p' backend/app/core/providers.py | head -50

Repository: HardMax71/Integr8sCode

Length of output: 1007


🏁 Script executed:

# Check K8sWorkerProvider for consumer setup
sed -n '/^class K8sWorkerProvider/,/^class /p' backend/app/core/providers.py | head -50

Repository: HardMax71/Integr8sCode

Length of output: 1056


🏁 Script executed:

# Search across all providers for any method that returns AIOKafkaConsumer
rg "@provide" backend/app/core/providers.py -A 20 | grep -B 5 -A 15 "AIOKafkaConsumer"

Repository: HardMax71/Integr8sCode

Length of output: 48


🏁 Script executed:

# Search the entire providers file for anything related to "consumer"
grep -n "consumer\|Consumer" backend/app/core/providers.py

Repository: HardMax71/Integr8sCode

Length of output: 48


🏁 Script executed:

# Let's check the EventProvider more thoroughly - maybe it provides the consumer
sed -n '283,313p' backend/app/core/providers.py

Repository: HardMax71/Integr8sCode

Length of output: 1035


🏁 Script executed:

# Search for UnifiedConsumer provider
grep -n "UnifiedConsumer" backend/app/core/providers.py -B 5 -A 10

Repository: HardMax71/Integr8sCode

Length of output: 48


🏁 Script executed:

# Maybe consumer is created somewhere else - search all event files
rg "get_aiokafka_consumer|@provide.*consumer" backend/app/events/ -B 3 -A 15

Repository: HardMax71/Integr8sCode

Length of output: 48


🏁 Script executed:

# Let's check if there's a consumer factory or if it's in __init__ files
rg "AIOKafkaConsumer" backend/app/ --type py -B 3 -A 3 | head -200

Repository: HardMax71/Integr8sCode

Length of output: 1775


🏁 Script executed:

# Check if EventProvider does more than what we saw
cat backend/app/core/providers.py | sed -n '283,312p'

Repository: HardMax71/Integr8sCode

Length of output: 1034


🏁 Script executed:

# Search for any file that creates or provides AIOKafkaConsumer
find backend/app -name "*.py" -exec grep -l "AIOKafkaConsumer" {} \;

Repository: HardMax71/Integr8sCode

Length of output: 146


🏁 Script executed:

# Check startup.py which might initialize the consumer
cat backend/app/core/startup.py

Repository: HardMax71/Integr8sCode

Length of output: 1589


🏁 Script executed:

# Look at the full consumer.py to understand how it's meant to be used
cat backend/app/events/core/consumer.py

Repository: HardMax71/Integr8sCode

Length of output: 2564


🏁 Script executed:

# Check if there's a consumer factory
rg "def.*consumer|class.*Consumer" backend/app/events/core/ -B 2 -A 15

Repository: HardMax71/Integr8sCode

Length of output: 5289


🏁 Script executed:

# Search for UnifiedConsumer provider - it must be provided somewhere
rg "@provide.*UnifiedConsumer|def.*unified_consumer" backend/app/core/providers.py -B 5 -A 20

Repository: HardMax71/Integr8sCode

Length of output: 48


🏁 Script executed:

# Let's check the entire EventReplayProvider to see if it provides consumer
sed -n '800,830p' backend/app/core/providers.py

Repository: HardMax71/Integr8sCode

Length of output: 637


🏁 Script executed:

# Search for all `@provide` methods in EventProvider
sed -n '283,313p' backend/app/core/providers.py

Repository: HardMax71/Integr8sCode

Length of output: 1035


🏁 Script executed:

# Maybe there's a consumer factory in the __init__ of events
cat backend/app/events/__init__.py

Repository: HardMax71/Integr8sCode

Length of output: 48


Wrap the consumer loop in try/finally to guarantee cleanup on exceptions.

An exception in handle() or commit() will skip await container.close(), leaving resources open. The async for loop (lines 41-43) should be wrapped to ensure cleanup always executes.

🔧 Suggested fix
-    async for msg in kafka_consumer:
-        await handler.handle(msg)
-        await kafka_consumer.commit()
-
-    logger.info("KubernetesWorker shutdown complete")
-
-    await container.close()
+    try:
+        async for msg in kafka_consumer:
+            await handler.handle(msg)
+            await kafka_consumer.commit()
+    finally:
+        await kafka_consumer.stop()
+        await container.close()
+        logger.info("KubernetesWorker shutdown complete")

This pattern affects all worker files (run_result_processor.py, run_saga_orchestrator.py, run_coordinator.py, run_pod_monitor.py).

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
kafka_consumer = await container.get(AIOKafkaConsumer)
handler = await container.get(UnifiedConsumer)
logger.info("KubernetesWorker started, consuming events...")
async for msg in kafka_consumer:
await handler.handle(msg)
await kafka_consumer.commit()
logger.info("KubernetesWorker shutdown complete")
await container.close()
kafka_consumer = await container.get(AIOKafkaConsumer)
handler = await container.get(UnifiedConsumer)
logger.info("KubernetesWorker started, consuming events...")
try:
async for msg in kafka_consumer:
await handler.handle(msg)
await kafka_consumer.commit()
finally:
await kafka_consumer.stop()
await container.close()
logger.info("KubernetesWorker shutdown complete")
🤖 Prompt for AI Agents
In `@backend/workers/run_k8s_worker.py` around lines 36 - 47, The consumer loop
can raise in handler.handle() or kafka_consumer.commit() and skip cleanup; wrap
the async for msg in kafka_consumer loop in a try/finally so that await
container.close() (and any other shutdown steps) always run. Specifically,
enclose the async for ...: await handler.handle(msg); await
kafka_consumer.commit() inside try: ... finally: await container.close() and
keep the logger.info("KubernetesWorker shutdown complete") inside the finally;
apply the same pattern to the other worker modules (run_result_processor.py,
run_saga_orchestrator.py, run_coordinator.py, run_pod_monitor.py) referencing
kafka_consumer, UnifiedConsumer.handle, kafka_consumer.commit, and
container.close where present.

Comment on lines +36 to +47
kafka_consumer = await container.get(AIOKafkaConsumer)
handler = await container.get(UnifiedConsumer)

logger.info("ResultProcessor started, consuming events...")

async for msg in kafka_consumer:
await handler.handle(msg)
await kafka_consumer.commit()

logger.info("ResultProcessor shutdown complete")

await container.close()
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

🏁 Script executed:

find . -name "run_result_processor.py" -type f

Repository: HardMax71/Integr8sCode

Length of output: 107


🏁 Script executed:

cat -n ./backend/workers/run_result_processor.py

Repository: HardMax71/Integr8sCode

Length of output: 3054


🏁 Script executed:

# Check the container implementation to understand how it manages lifecycle
find . -name "container.py" -path "*/core/*" -type f

Repository: HardMax71/Integr8sCode

Length of output: 97


🏁 Script executed:

cat -n ./backend/app/core/container.py

Repository: HardMax71/Integr8sCode

Length of output: 6533


🏁 Script executed:

# Check the MessagingProvider to see how AIOKafkaConsumer is configured
find . -path "*/core/providers.py" -o -path "*/core/providers/*" -type f | head -20

Repository: HardMax71/Integr8sCode

Length of output: 97


🏁 Script executed:

cat -n ./backend/app/core/providers.py | head -150

Repository: HardMax71/Integr8sCode

Length of output: 7420


🏁 Script executed:

grep -n "AIOKafkaConsumer" ./backend/app/core/providers.py

Repository: HardMax71/Integr8sCode

Length of output: 48


🏁 Script executed:

wc -l ./backend/app/core/providers.py

Repository: HardMax71/Integr8sCode

Length of output: 101


🏁 Script executed:

grep -n "class.*Provider" ./backend/app/core/providers.py | grep -i "messaging\|kafka"

Repository: HardMax71/Integr8sCode

Length of output: 182


🏁 Script executed:

sed -n '224,270p' ./backend/app/core/providers.py

Repository: HardMax71/Integr8sCode

Length of output: 1536


🏁 Script executed:

sed -n '201,223p' ./backend/app/core/providers.py

Repository: HardMax71/Integr8sCode

Length of output: 778


🏁 Script executed:

grep -n "AIOKafkaConsumer\|get.*consumer" ./backend/app/core/providers.py

Repository: HardMax71/Integr8sCode

Length of output: 48


🏁 Script executed:

rg "AIOKafkaConsumer" --type py -A 5 -B 5

Repository: HardMax71/Integr8sCode

Length of output: 21734


🏁 Script executed:

sed -n '280,400p' ./backend/app/core/providers.py

Repository: HardMax71/Integr8sCode

Length of output: 3802


🏁 Script executed:

grep -n "provide.*consumer\|provide.*kafka" ./backend/app/core/providers.py -i

Repository: HardMax71/Integr8sCode

Length of output: 227


🏁 Script executed:

sed -n '508,650p' ./backend/app/core/providers.py

Repository: HardMax71/Integr8sCode

Length of output: 4603


Wrap the consumer loop in try/finally to guarantee cleanup on failure and cancellation.

The consumer loop lacks exception/cancellation handling. If handle() raises or the consumer task is cancelled (e.g., via SIGTERM), the async for loop exits abnormally and await container.close() is never executed, leaving the consumer open. This pattern affects all worker files (run_pod_monitor.py, run_saga_orchestrator.py, run_k8s_worker.py, run_coordinator.py).

The producer already demonstrates correct cleanup with explicit stop() in a finally block. Apply the same pattern to the consumer:

🔧 Suggested fix
-    async for msg in kafka_consumer:
-        await handler.handle(msg)
-        await kafka_consumer.commit()
-
-    logger.info("ResultProcessor shutdown complete")
-
-    await container.close()
+    try:
+        async for msg in kafka_consumer:
+            await handler.handle(msg)
+            await kafka_consumer.commit()
+    finally:
+        await kafka_consumer.stop()
+        await container.close()
+        logger.info("ResultProcessor shutdown complete")
🤖 Prompt for AI Agents
In `@backend/workers/run_result_processor.py` around lines 36 - 47, Wrap the
consumer loop in a try/finally so cleanup always runs: start by acquiring
kafka_consumer (AIOKafkaConsumer) and handler (UnifiedConsumer) as before, then
run the async for msg in kafka_consumer / await handler.handle(msg) loop inside
a try block; in the finally block ensure you stop/close the kafka_consumer (call
kafka_consumer.stop() or equivalent) and then await container.close(); re-raise
cancellation errors if needed so signal handling still works. This guarantees
the consumer and container are closed even if handler.handle() raises or the
task is cancelled.

Comment on lines +41 to +47
async for msg in kafka_consumer:
await handler.handle(msg)
await kafka_consumer.commit()

try:
# Wait for shutdown signal or service to stop
while orchestrator.is_running and not shutdown_event.is_set():
await asyncio.sleep(1)
finally:
# Container cleanup stops everything
logger.info("Initiating graceful shutdown...")
await container.close()
logger.info("SagaOrchestrator shutdown complete")

logger.warning("Saga orchestrator stopped")
await container.close()
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Missing graceful shutdown handling and error resilience.

Several issues with the main loop:

  1. No graceful shutdown: Signal handling (SIGTERM/SIGINT) was removed. In containerized environments, workers need to handle shutdown signals to stop consuming and exit cleanly.

  2. No error handling: If handler.handle(msg) raises an exception (which UnifiedConsumer.handle does after logging), the worker crashes. A single poison message could take down the worker.

  3. Container cleanup not guaranteed: container.close() won't be called if an exception occurs in the loop.

Proposed fix with shutdown handling and error resilience
+import signal
+from contextlib import suppress
+
 async def run_saga_orchestrator(settings: Settings) -> None:
     """Run the saga orchestrator service."""
 
     container = create_saga_orchestrator_container(settings)
     logger = await container.get(logging.Logger)
     logger.info("Starting SagaOrchestrator with DI container...")
 
     db = await container.get(Database)
     await init_beanie(database=db, document_models=ALL_DOCUMENTS)
 
     schema_registry = await container.get(SchemaRegistryManager)
     await initialize_event_schemas(schema_registry)
 
     kafka_consumer = await container.get(AIOKafkaConsumer)
     handler = await container.get(UnifiedConsumer)
 
+    shutdown_event = asyncio.Event()
+    loop = asyncio.get_running_loop()
+    for sig in (signal.SIGTERM, signal.SIGINT):
+        loop.add_signal_handler(sig, shutdown_event.set)
+
     logger.info("SagaOrchestrator started, consuming events...")
 
-    async for msg in kafka_consumer:
-        await handler.handle(msg)
-        await kafka_consumer.commit()
-
-    logger.info("SagaOrchestrator shutdown complete")
-
-    await container.close()
+    try:
+        async for msg in kafka_consumer:
+            if shutdown_event.is_set():
+                break
+            try:
+                await handler.handle(msg)
+                await kafka_consumer.commit()
+            except Exception as e:
+                logger.exception(f"Error processing message: {e}")
+                # Message not committed - will be redelivered
+    finally:
+        logger.info("SagaOrchestrator shutting down...")
+        with suppress(Exception):
+            await kafka_consumer.stop()
+        await container.close()
+        logger.info("SagaOrchestrator shutdown complete")
🤖 Prompt for AI Agents
In `@backend/workers/run_saga_orchestrator.py` around lines 41 - 47, Add graceful
shutdown and per-message error resilience around the kafka_consumer loop:
install asyncio signal handlers for SIGINT/SIGTERM that set an asyncio.Event
(e.g., shutdown_event) and make the async for msg in kafka_consumer loop respect
that event so it stops cleanly; wrap message processing so each iteration calls
await handler.handle(msg) inside a try/except that logs the exception via
logger.error (including exception details) and continues (optionally still
commit offsets on successful processing via kafka_consumer.commit), and wrap the
whole consumer loop in try/finally to ensure await container.close() and any
kafka_consumer stop/close are always awaited on shutdown. Reference symbols:
kafka_consumer, handler.handle, kafka_consumer.commit, container.close, logger.

@sonarqubecloud
Copy link

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (4)
backend/app/services/idempotency/middleware.py (2)

197-241: Dead code: async_handler is defined but never used.

The async_handler function is defined but never referenced. The dispatcher path (line 249) registers dispatch_handler instead, and the else branch (line 251-252) only logs an error. This entire function is orphaned code.

🐛 Suggested fix: remove dead code
-        # Create an async handler that processes the message
-        async def async_handler(message: Any) -> Any:
-            self.logger.info(f"IDEMPOTENT HANDLER CALLED for {event_type}")
-
-            # Extract event from confluent-kafka Message
-            if not hasattr(message, "value"):
-                self.logger.error(f"Received non-Message object for {event_type}: {type(message)}")
-                return None
-
-            # Debug log to check message details
-            self.logger.info(
-                f"Handler for {event_type} - Message type: {type(message)}, "
-                f"has key: {hasattr(message, 'key')}, "
-                f"has topic: {hasattr(message, 'topic')}"
-            )
-
-            raw_value = message.value
-
-            # Debug the raw value
-            self.logger.info(f"Raw value extracted: {raw_value[:100] if raw_value else 'None or empty'}")
-
-            # Handle tombstone messages (null value for log compaction)
-            if raw_value is None:
-                self.logger.warning(f"Received empty message for {event_type} - tombstone or consumed value")
-                return None
-
-            # Handle empty messages
-            if not raw_value:
-                self.logger.warning(f"Received empty message for {event_type} - empty bytes")
-                return None
-
-            try:
-                # Deserialize using schema registry if available
-                event = await self.consumer._schema_registry.deserialize_event(raw_value, message.topic)
-                if not event:
-                    self.logger.error(f"Failed to deserialize event for {event_type}")
-                    return None
-
-                # Call the idempotent wrapper directly in async context
-                await idempotent_wrapper(event)
-
-                self.logger.debug(f"Successfully processed {event_type} event: {event.event_id}")
-                return None
-            except Exception as e:
-                self.logger.error(f"Failed to process message for {event_type}: {e}", exc_info=True)
-                raise
-
         # Register with the dispatcher if available

If this code was intended to be used for direct consumer registration (the else branch), it should be wired up there. Otherwise, remove it to avoid confusion.


243-252: Incomplete fallback: else branch logs error but doesn't register the handler.

When no dispatcher is available, the code logs an error but doesn't actually register the handler with the consumer. If direct consumer registration is intended as a fallback, the previously defined async_handler (or equivalent) should be registered here. If direct registration isn't supported, consider raising an exception instead of silently failing.

♻️ Suggested fix: raise exception or implement fallback

Option 1 - Raise an exception if dispatcher is required:

         else:
-            # Fallback to direct consumer registration if no dispatcher
-            self.logger.error(f"No EventDispatcher available for registering idempotent handler for {event_type}")
+            raise RuntimeError(f"No EventDispatcher available for registering idempotent handler for {event_type}")

Option 2 - If direct consumer registration should work, implement it properly (requires understanding of UnifiedConsumer API).

backend/app/services/result_processor/processor.py (2)

88-94: Fragile memory limit parsing will fail for non-Mi suffixes.

rstrip("Mi") removes individual characters 'M' and 'i', not the literal string "Mi". While this works for "128Mi", it will:

  • Fail with ValueError for "1Gi" → "1G" (not a valid int)
  • Give incorrect results for "1G" → "1" (should be 1024 MiB)
  • Fail for "256Ki" or other Kubernetes resource quantity formats

Consider a proper parser:

Proposed fix
+def _parse_memory_to_mib(self, memory_str: str) -> float:
+    """Parse Kubernetes memory quantity to MiB."""
+    import re
+    match = re.match(r'^(\d+(?:\.\d+)?)\s*([KMGTPE]i?)?[bB]?$', memory_str)
+    if not match:
+        raise ValueError(f"Invalid memory format: {memory_str}")
+    value = float(match.group(1))
+    unit = match.group(2) or ''
+    multipliers = {
+        '': 1 / (1024 * 1024), 'K': 1 / 1024, 'Ki': 1 / 1024,
+        'M': 1, 'Mi': 1, 'G': 1024, 'Gi': 1024,
+    }
+    return value * multipliers.get(unit, 1)

Then use:

-            settings_limit = self._settings.K8S_POD_MEMORY_LIMIT
-            memory_limit_mib = int(settings_limit.rstrip("Mi"))
+            memory_limit_mib = self._parse_memory_to_mib(self._settings.K8S_POD_MEMORY_LIMIT)

123-132: Exit code 0 incorrectly becomes -1.

Line 126: exit_code=event.exit_code or -1 treats 0 as falsy. If an execution fails with exit_code=0 (possible in some edge cases), this will incorrectly record -1.

Since ExecutionFailedEvent.exit_code is typed as int (not Optional[int]), the fallback may be unnecessary. If it's defensive against potential None values, use an explicit check:

Proposed fix
-            exit_code=event.exit_code or -1,
+            exit_code=event.exit_code if event.exit_code is not None else -1,
🧹 Nitpick comments (4)
backend/app/services/idempotency/middleware.py (2)

5-5: Inconsistent type hint style: mixing Dict/Set with list.

The file uses Dict and Set from typing (line 5) but also uses the builtin list for generic type hints (lines 141, 157). For consistency, either use all typing imports (List) or all builtins (dict, set) throughout.

♻️ Suggested fix for consistency

If targeting Python 3.9+, use builtin generics throughout:

-from typing import Any, Awaitable, Callable, Dict, Set
+from typing import Any, Awaitable, Callable

Then update:

-        fields: Set[str] | None = None,
+        fields: set[str] | None = None,
-        self._original_handlers: Dict[EventType, list[Callable[[DomainEvent], Awaitable[None]]]] = {}
+        self._original_handlers: dict[EventType, list[Callable[[DomainEvent], Awaitable[None]]]] = {}

Also applies to: 141-141, 157-157


146-150: Defensive check on non-optional parameter.

The guard if not self.dispatcher at line 148 checks a parameter that is typed as required (dispatcher: EventDispatcher). While defensive coding can be helpful, this creates inconsistency—if dispatcher can legitimately be None, the type hint should reflect that (dispatcher: EventDispatcher | None). Otherwise, this guard is misleading.

Either update the type hint to be explicit about allowing None:

-        dispatcher: EventDispatcher,
+        dispatcher: EventDispatcher | None,

Or trust the type system and remove the guard (relying on type checkers to catch invalid calls).

backend/app/services/result_processor/processor.py (2)

31-43: Unused configuration fields.

ResultProcessorConfig defines batch_size, processing_timeout, topics, and result_topic, but none of these are accessed anywhere in ResultProcessor. The stored self._config is never read after assignment.

If these are planned for future use, consider removing them until needed (YAGNI). If they should be used, wire them into the handler logic.


180-196: Consider extracting hardcoded service_version.

"1.0.0" is duplicated on lines 182 and 195. Extracting this to a constant or config would simplify version bumps and ensure consistency.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants